Research Project: Adaptive ML for Behavioral Trading Signals¶

Daily S&P 500 Equities (2014–2024)¶

X-HEC Double Degree Data & Finance¶


Auguste PIROMALLI (auguste.piromalli@hec.edu)
Nadiy ABDEL KARIM DIT ARAMOUNI (nadiy.abdel-karim-dit-aramouni@hec.edu)


In this notebook we perform the following:

  • Data ingest & QC (features + close for 50+ long-term constituents).
  • Target engineering for multi-horizon forward returns (default: 5D and 21D).
  • Feature pre-lag (as-of)
  • Cross-sectional constant feature filter
  • Walk-forward CV (expanding window, monthly steps)
  • Regularized linear + tree models
  • Cross-sectional portfolio construction (long-only or long–short) with turnover & costs.
  • Rank IC (+ sanity probes)
  • Backtest analytics (CAGR, Sharpe, Sortino, max DD, turnover, coverage).


1 - Imports & parameters¶

This section sets the foundations for data handling, modeling, and portfolio simulation.

In [1]:
# --- Setup ---
import os, re, math, json, warnings
import random
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple, Iterable, Dict
import matplotlib.pyplot as plt
import itertools

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.base import clone
from sklearn.linear_model import Ridge, ElasticNet, HuberRegressor
from sklearn.ensemble import HistGradientBoostingRegressor


warnings.filterwarnings('ignore')

# ----- Paths & dates -----
DATA_ROOT = Path('data/tickers')
CACHE_PARQUET = Path('data/equity_equitydata.parquet')
START_DATE = '2014-01-01'
END_DATE   = '2024-12-31'


# ---- Signals / targets ----
# Modeling target(s): forward log returns horizons (in trading days)
TARGET_HORIZONS = [5, 21]      # trading days, 1-week and ~1-month
MAX_H = max(TARGET_HORIZONS)   # used to embargo tail when training


# Walk-forward CV settings
TRAIN_YEARS   = 3      # expanding window starts at 3 years
STEP_MONTHS   = 1      # move forward one month between folds
EMBARGO_DAYS  =  MAX_H     #  gap between train and test windows

# Portfolio & costs
REBALANCE_FREQ   = 'W-FRI'  # 'W-FRI' or 'M' for monthly
LONG_SHORT       = True    # True -> long top q & short bottom q; False -> long-only
TOP_QUANTILE     = 0.2      # cross-sectional top quantile to hold
HOLDING_PERIOD   = 5        # days to hold each signal
TCOST_BPS        = 20        # transaction cost in basis points per turnover

# Master toggle: run multi-variant sweep? 
USE_VARIANT_SWEEP = False   # True only when exploring

#  Production model used by the whole notebook
PROD_MODEL_KIND   = 'ridge' #{"ridge","elasticnet","huber","hgbt"}
PROD_MODEL_PARAMS = {}

# Single source of truth for all earlier sections (_fitpredict_one_horizon, etc.)
MODEL_KIND, MODEL_PARAMS = PROD_MODEL_KIND, PROD_MODEL_PARAMS

# ---- Misc ----
DTYPE = np.float32
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
os.environ["PYTHONHASHSEED"] = str(RANDOM_STATE)

os.chdir(Path().resolve()) 
C:\Users\nadiy\anaconda3\Lib\site-packages\pandas\core\arrays\masked.py:61: UserWarning: Pandas requires version '1.3.6' or newer of 'bottleneck' (version '1.3.5' currently installed).
  from pandas.core import (

Risk Control Parameters¶

Define constraints and toggles for portfolio construction: liquidity filters, beta-neutralization, position caps, and leverage settings.
These guard against illiquid names, unintended factor exposures, and concentration risks.

In [2]:
# --- Risk-control toggles & params ---
USE_ADV_ELIGIBILITY = True    # True -> drop illiquid names by ADV percentile
ADV_LOOKBACK_DAYS   = 60       # rolling days for ADV
ADV_PCT             = 0.20     # keep names above this cross-sec percentile each date

USE_BETA_NEUTRAL    = False    # True -> neutralize scores vs rolling beta each date
BETA_LOOKBACK_DAYS  = 252      # rolling days for beta estimation

MIN_XS_NAMES        = 10       # guard for per-date regressions
RIDGE_EPS           = 1e-12    # small ridge for numerical safety

USE_PER_NAME_CAP   = True     # clip per-name weights at abs cap, then renormalize
PER_NAME_CAP       = 0.02     # e.g.: 2% max per name (long-only)

BETA_NEUTRAL_STRENGTH = 1.0   # 1.0 full, 0.5 half, 0 off = skip overlay

# target gross exposure (sum |w|). With LONG_SHORT=True, make_positions already splits 50/50 long & short.
GROSS_LEVERAGE   = 1.0       # set 2.0 for “1 long + 1 short” gross

Plotting Helpers¶

This section defines reusable plotting functions to visualize model performance and signal quality.

Functions Overview¶

  • plot_equity_curve(eq, title)
    Plots the cumulative equity curve of a single strategy over time.
    Useful to quickly visualize performance evolution.

  • plot_rolling_ic(ic, window, title)
    Plots the rolling mean of the Information Coefficient (IC), which measures the rank correlation between model predictions and realized returns.
    Indicates whether the signal has stable predictive power through time.

  • lead_lag_ic_profile_plotly(pred_df, df_prices, horizons, title, compute_rank_ic_fn)
    Computes and plots IC across different lead/lag horizons.

    • Positive horizons → predictive power for future returns.
    • Negative horizons → check for look-ahead bias.
      This is a key diagnostic for signal validity.
  • bar_ic_means(ic_dict, title)
    Plots average IC values for each horizon or lag, making it easy to compare signals.

  • multi_equity_plot(eq_dict, title)
    Compares equity curves from multiple strategies in one chart (e.g., baseline ML vs RL).
    Helps assess relative robustness and performance.

In [3]:
# --- Plotly helpers ---
import plotly.graph_objects as go
import plotly.io as pio
from IPython.display import display, HTML, clear_output

pd.options.display.float_format = '{:.4f}'.format
pio.templates.default = "simple_white"  

# Toggle: False to fall back to Matplotlib for any plot
USE_PLOTLY = True

def _fig_size(w=900, h=350): return dict(width=w, height=h)

def plot_equity_curve(eq: pd.Series, title="Strategy Equity Curve"):
    if not USE_PLOTLY:
        ax = eq.plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
    fig = go.Figure([go.Scatter(x=eq.index, y=eq.values, mode="lines")])
    fig.update_layout(title=title, yaxis_title="Equity", showlegend=False, **_fig_size())
    fig.update_xaxes(showgrid=False)
    fig.show()

def plot_rolling_ic(ic: pd.Series, window=60, title=None):
    roll = ic.rolling(window).mean()
    title = title or f"{window}D Rolling Rank IC"
    if not USE_PLOTLY:
        ax = roll.plot(figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return
    fig = go.Figure([go.Scatter(x=roll.index, y=roll.values, mode="lines")])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size())
    fig.show()

def lead_lag_ic_profile_plotly(
    pred_df,
    df_prices,
    horizons=range(-15,16),
    title="Lead/Lag IC profile (H in days)",
    compute_rank_ic_fn=None,   
):
    
    if compute_rank_ic_fn is None:
        compute_rank_ic_fn = globals().get("compute_rank_ic", None)
    if compute_rank_ic_fn is None:
        raise NameError("compute_rank_ic is not defined yet; define it or pass as compute_rank_ic_fn=...")

    prof = {}
    for h in progress(list(horizons), total=len(list(horizons)),
                      desc="Lead/Lag IC"):
        ic_h = compute_rank_ic_fn(pred_df, df_prices, horizon=h, min_names=10, demean=True)
        prof[h] = float(ic_h.mean()) if len(ic_h) else np.nan

    s = pd.Series(prof, name="IC")
    if not USE_PLOTLY:
        ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
    fig = go.Figure([go.Bar(x=[int(i) for i in s.index], y=s.values)])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(950,300))
    fig.show()
    return s


def bar_ic_means(ic_dict: dict, title="Per-lag mean IC"):
    s = pd.Series(ic_dict).sort_index()
    if not USE_PLOTLY:
        ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
    fig = go.Figure([go.Bar(x=[str(k) for k in s.index], y=s.values)])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(700,300))
    fig.show()
    return s

def multi_equity_plot(eq_dict: dict, title="Equity curves"):
    if not USE_PLOTLY:
        ax = pd.concat(eq_dict, axis=1).plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
    fig = go.Figure()
    for name, s in eq_dict.items():
        fig.add_trace(go.Scatter(x=s.index, y=s.values, mode="lines", name=name))
    fig.update_layout(title=title, yaxis_title="Equity", **_fig_size(950,380))
    fig.show()

Progress Bars¶

To monitor long-running loops and model training, we enable tqdm progress bars and create two helper functions

In [4]:
# --- Progress bars -----------------------
from contextlib import nullcontext

USE_TQDM = True  # master toggle for all progress bars

try:
    from tqdm.auto import tqdm as _tqdm        
    try:
        from tqdm.contrib import tqdm_joblib as _tqdm_joblib  
    except Exception:
        _tqdm_joblib = None
except Exception:
    _tqdm = None
    _tqdm_joblib = None

def progress(iterable, **kwargs):
    """
    Serial loops: wrap any iterable with a tqdm bar when available and enabled.
    Falls back to identity iterator (no bar).
    """
    if USE_TQDM and _tqdm is not None:
        kwargs.setdefault("dynamic_ncols", True)
        kwargs.setdefault("leave", False)
        return _tqdm(iterable, **kwargs)
    return iterable

def joblib_progress(total: int, desc: str = "Working"):
    """
    Parallel loops (joblib.Parallel): return a context manager that
    shows a single tqdm bar while tasks run. Falls back to a no-op.
    Usage:
        with joblib_progress(len(tasks), "Model variants"):
            results = Parallel(...)(...)
    """
    if USE_TQDM and (_tqdm is not None) and (_tqdm_joblib is not None):
        return _tqdm_joblib(_tqdm(total=total, desc=desc, dynamic_ncols=True, leave=True))
    return nullcontext()

Data Loader Utilities¶

This section defines helper functions for loading and standardizing raw ticker data.
The goal is to normalize column names, ensure consistent formatting (dates, prices, volumes), and efficiently cache results.

Functions¶

  • sanitize_columns(cols)
    Cleans messy column names by removing invisible characters, replacing special symbols (%, /, -, etc.), and enforcing unique, safe identifiers.
    Example: "Adj Close (%)" → "Adj_Close_pct"

  • _infer_ticker_from_path(path)
    Extracts the ticker symbol from a file name (e.g., "ABT_technical_cleaned.csv" → "ABT").
    Fallback: uses the parent folder name if the filename is ambiguous.

  • load_csv(path)
    Loads a single ticker CSV into a standardized DataFrame.
    Steps:

    1. Sanitize column names.
    2. Parse and normalize dates (Date or Datetime).
    3. Standardize price/volume column names (close, volume).
    4. Convert all numeric columns to consistent type (DTYPE, here float32).
    5. Set the date as index, drop duplicates, and attach the ticker symbol.
  • load_all(root, force=False)
    Recursively loads all CSVs from the given root folder:

    • If a cached parquet file (CACHE_PARQUET) exists and force=False, load from it directly for speed.
    • Otherwise, load each CSV with load_csv(), concatenate, and save to parquet.
    • Returns a single time-indexed DataFrame covering all tickers.
In [5]:
# --- Loader utilities ---

def sanitize_columns(cols):
    """Normalize weird characters and make safe, unique column names."""
    out = []
    for c in cols:
        c2 = re.sub(r'[\u200b-\u200f\u202a-\u202e]', '', c)  # zero-width & bidi
        c2 = (c2.replace('%', 'pct').replace('/', '_').replace('\\', '_')
                  .replace('(', '_').replace(')', '_').replace(',', '_')
                  .replace(' ', '_').replace('-', '_'))
        c2 = re.sub(r'__+', '_', c2)
        c2 = re.sub(r'[^0-9A-Za-z_]', '', c2).strip('_')
        out.append(c2)
    # de-duplicate while preserving order
    seen, uniq = {}, []
    for c in out:
        if c not in seen:
            seen[c] = 0; uniq.append(c)
        else:
            seen[c] += 1; uniq.append(f"{c}_{seen[c]}")
    return uniq


def _infer_ticker_from_path(path: Path) -> str:
    """ABT_technical_cleaned.csv -> ABT; fallback to parent folder name."""
    stem = path.stem
    tkr = stem.split('_', 1)[0].upper() if '_' in stem else path.parent.name.upper()
    return tkr


def load_csv(path: Path) -> pd.DataFrame:
    """Load one ticker CSV with a consistent schema."""
    df = pd.read_csv(path)
    df.columns = sanitize_columns(df.columns)

    # date parse & normalize (all files share the same layout; expect 'Date' or 'date')
    date_col_candidates = [c for c in df.columns if c.lower() in ('date', 'datetime')]
    assert date_col_candidates, f"No date column found in {path.name}"
    dcol = date_col_candidates[0]

    df[dcol] = pd.to_datetime(df[dcol], errors='coerce', utc=True)
    df = df.dropna(subset=[dcol]).sort_values(dcol)
    df[dcol] = df[dcol].dt.tz_localize(None).dt.normalize()

    # rename canonical price/volume (files already cleaned, so this is just standardization)
    if 'Close' in df.columns:   # after sanitize, original 'Close' stays 'Close'
        df = df.rename(columns={'Close': 'close'})
    if 'close' not in df.columns and 'Close_1' in df.columns:
        df = df.rename(columns={'Close_1': 'close'})
    if 'Volume' in df.columns:
        df = df.rename(columns={'Volume': 'volume'})

    # numeric coercion for all non-date columns, then cast to DTYPE
    for c in df.columns:
        if c == dcol:
            continue
        if df[c].dtype == 'object':
            df[c] = df[c].astype(str).str.replace(',', '', regex=False)
        df[c] = pd.to_numeric(df[c], errors='coerce').astype(DTYPE, copy=False)

    # index & dedup
    df = df.set_index(dcol)
    df = df[~df.index.duplicated(keep='last')]

    # attach ticker
    df['ticker'] = _infer_ticker_from_path(path)
    return df


def load_all(root: Path, force: bool = False) -> pd.DataFrame:
    """
    Recursively load all CSVs under DATA_ROOT into a single DataFrame,
    cache to parquet for fast reloads.
    """
    if CACHE_PARQUET.exists() and not force:
        return pd.read_parquet(CACHE_PARQUET)

    csvs = sorted(root.rglob('*.csv'))
    if not csvs:
        raise FileNotFoundError(f"No CSV files found under {root.resolve()}")

    frames = [load_csv(p) for p in progress(csvs, desc="Load CSVs")]
    out = pd.concat(frames, axis=0).sort_index()
    out.to_parquet(CACHE_PARQUET)
    return out


2 - Data Loading & Validation¶

Now that our setup is complete and all helper functions are defined, we can load all raw ticker data into a unified DataFrame, restrict to the analysis window, and run basic quality checks.

Steps¶

  1. Load data

    • load_all(DATA_ROOT, force=True) loads all CSVs and forces a fresh parquet build.
    • In normal runs, force=False reuses the parquet cache for faster reload.
  2. Restrict to analysis window

    • Trim dataset to START_DATE : END_DATE (2014–2024).
    • Ensure index is timezone-naive and normalized to daily frequency.
    • Cast ticker to a categorical type (saves memory, safer group ops).
  3. Define feature list

    • base_features: all columns except ticker and close.
    • tickers: list of all available tickers.
  4. Sanity checks & coverage

    • Report total rows, number of tickers, number of raw features, and date span.
    • Print missing value percentages for the worst 10 features.
    • Compute per-ticker coverage (first/last date, row count).
  5. Winsorization helper

    • winsorize_clip() will be used later to clip feature distributions at quantiles (reduce outlier impact).
In [6]:
# Load (force=False in normal runs to use the parquet cache)
df = load_all(DATA_ROOT, force=True)

# Restrict to analysis window and normalize/index guarantees
df = df.loc[START_DATE:END_DATE].sort_index()
assert 'close' in df.columns, "'close' column missing after load"
assert getattr(df.index, "tz", None) is None, "Index should be tz-naive"
df.index = pd.to_datetime(df.index).normalize()   # idempotent
df.index.name = "date" 
df['ticker'] = df['ticker'].astype('category')    # memory + safer ops

# Feature list BEFORE targets are added (keeps naming clear later)
base_features = [c for c in df.columns if c not in ['ticker', 'close']]
tickers = df['ticker'].cat.categories.tolist()

print(f"Rows: {len(df):,} | Tickers: {len(tickers)} | Raw features: {len(base_features)}")
print("Span:", df.index.min(), "→", df.index.max())


miss = df[base_features].isna().mean().sort_values(ascending=False)
print("NaN% (worst 10)\n", miss.head(10))

# coverage sanity per ticker
cov = (df.reset_index()                           
         .groupby('ticker', as_index=False)
         .agg(first_date=('date', 'min'),
              last_date =('date', 'max'),
              rows      =('date', 'size'))
         .sort_values('rows', ascending=False))

print("\nPer-ticker coverage (top 10 by rows):")
print(cov.head(10))


# Winsorization helper (used later inside pipelines)
def winsorize_clip(X: pd.DataFrame, lo=0.005, hi=0.995):
    ql, qh = X.quantile(lo), X.quantile(hi)
    return X.clip(lower=ql, upper=qh, axis=1)
Load CSVs:   0%|                                                                               | 0/650 [00:00<…
Rows: 164,404 | Tickers: 60 | Raw features: 81
Span: 2014-01-01 00:00:00 → 2024-12-30 00:00:00
NaN% (worst 10)
 Visitor_Trend                    0.3558
MA_Neg_Vol_255_ma                0.0199
KlingerSignal_Klinger_13_34_55   0.0155
Klinger_Klinger_13_34_55         0.0154
Klinger_13_34_55_hist            0.0154
Result_Vol_ROC_14                0.0076
Result_Chaikin_Vol_14_2_ma       0.0075
Result_Mass_Idx_25_27            0.0072
Result_Intraday_Mtm_20           0.0070
Result_M_Flow_14                 0.0049
dtype: float64

Per-ticker coverage (top 10 by rows):
   ticker first_date  last_date  rows
40    AVB 2014-01-01 2024-12-30  2785
48    BAX 2014-01-01 2024-12-30  2785
43   AXON 2014-01-01 2024-12-30  2785
0       A 2014-01-01 2024-12-30  2784
44    AXP 2014-01-01 2024-12-30  2784
32    AOS 2014-01-01 2024-12-30  2784
33    APA 2014-01-01 2024-12-30  2784
34    APD 2014-01-01 2024-12-30  2784
35    APH 2014-01-01 2024-12-30  2784
36    APO 2014-01-01 2024-12-30  2784


3 - Target Engineering (Forward Log Returns)¶

We build forward-return targets per ticker for horizons in TARGET_HORIZONS, then keep only rows where at least one target exists. Feature/target separation and basic guards included.

In [7]:
# --- Target engineering ---
def add_targets(d: pd.DataFrame, horizons: List[int]) -> pd.DataFrame:
    d = d.copy()
    d["log_close"] = np.log(d["close"])
    out = []
    n_tkr = d['ticker'].nunique()
    for tkr, g in progress(d.groupby("ticker", group_keys=False),
                           total=n_tkr, desc="Build forward targets"):
        for h in horizons:
            g[f"fwd_ret_{h}d"] = g["log_close"].shift(-h) - g["log_close"]
        out.append(g)

    return pd.concat(out)

# Build forward targets
df = add_targets(df, TARGET_HORIZONS)

# Columns
target_cols  = [f"fwd_ret_{h}d" for h in TARGET_HORIZONS]
exclude_cols = set(["ticker", "close", "log_close"] + target_cols)
feature_cols = [c for c in df.columns if c not in exclude_cols]

# Basic guards
assert len(feature_cols) > 0, "No features left after exclusion!"
assert not (set(feature_cols) & exclude_cols), "Feature/target column overlap."

# Keep rows where at least one target exists (drop only tail per ticker)
df = df[df[target_cols].notna().any(axis=1)].copy()

print("Features after target add:", len(feature_cols))
print("Shape after dropping all-NA target rows:", df.shape)
Build forward targets:   0%|                                                                    | 0/60 [00:00<…
Features after target add: 81
Shape after dropping all-NA target rows: (163987, 86)

We enforce an as-of principle: features used at time t must be known strictly before the prediction target.

Steps¶

  1. Lag features

    • Apply a global 1-day lag (ASOF_LAG_DAYS=1) to all features within each ticker.
    • Prevents lookahead bias (using information from the same day as the return).
  2. Drop incomplete rows

    • Remove any row missing the target or any feature.
  3. Remove unstable features

    • Compute how often features are constant across all tickers per day.
    • If a feature is constant in >80% of days, drop it (little/no cross-sectional info).
  4. Finalize types

    • Cast all features to the chosen numeric dtype (float32).

Output¶

  • df_asof: fully lagged and cleaned dataset.
  • Reports shape, number of features kept, and how many dropped.
In [8]:
# As-of feature freeze (global 1D lag)
ASOF_LAG_DAYS = 1
TARGET = f"fwd_ret_{TARGET_HORIZONS[0]}d"

df_asof = df.copy()
df_asof[feature_cols] = df_asof.groupby('ticker', group_keys=False)[feature_cols].shift(ASOF_LAG_DAYS)

# Drop rows missing either target or any feature
df_asof = df_asof.dropna(subset=[TARGET] + feature_cols, how='any')

# Optional: drop cross-sectionally constant-by-day features (speed + stability)
cs_nuniq = df_asof.groupby(level=0)[feature_cols].nunique()
share_const = (cs_nuniq <= 1).mean(axis=0).sort_values(ascending=False)
CONST_THRESH = 0.80
const_like = share_const[share_const > CONST_THRESH].index.tolist()
feature_cols = [c for c in feature_cols if c not in const_like]
df_asof = df_asof.dropna(subset=feature_cols + [TARGET]).copy()

# dtype
for c in feature_cols:
    df_asof[c] = df_asof[c].astype(DTYPE)

print(f"[asof] df_asof: {df_asof.shape} | kept features: {len(feature_cols)} (dropped {len(const_like)})")
[asof] df_asof: (102915, 86) | kept features: 81 (dropped 0)

Eligibility Lookback & Leakage Safeguards¶

We impose two safeguards before modeling:

1) Minimum history requirement
A name must have at least MIN_HIST_DAYS (=252) days of history after the as-of lag to be eligible for scoring.
This stabilizes rolling stats (e.g., z-scores, betas) and avoids cold-start noise.

2) Leakage drop-list
Certain features are known/assumed to leak future information (directly or via look-ahead preprocessing).
We drop them from both the feature list and the training view to prevent optimistic bias.

Outputs

  • df_asof: filtered to names with sufficient history and without leaky fields.
  • feature_cols: updated to exclude leaky features.
  • Assertions ensure no NAs remain in the modeling view and no dropped feature survives in feature_cols.
In [9]:
# Require a minimum lookback before a name can be scored
MIN_HIST_DAYS = 252
tmp = df_asof.reset_index().rename(columns={"index":"date"}) if df_asof.index.name is None else df_asof.reset_index()
tmp["first_date"] = tmp.groupby("ticker")["date"].transform("min")
tmp["days_live"]  = (tmp["date"] - tmp["first_date"]).dt.days
df_asof = tmp[tmp["days_live"] >= MIN_HIST_DAYS].drop(columns=["first_date","days_live"]).set_index("date")
In [10]:
# --- leakage drop-list (8 features) ---
LEAKY_FEATURES = [
    "Result_Detrended_14_ma",
    "MA_Neg_Vol_255_ma",
    "STARC_Bands_Bottom_STARC_Bands_15_5_13_y",
    "STARC_Bands_Median_STARC_Bands_15_5_13_y",
    "STARC_Bands_Top_STARC_Bands_15_5_13_y",
    "Result_Hist_Vol_10_252_1",
    "PMOSignal_PMO_35_20_10",
    "Gator_13_8_8_5_5_3_hist1",
]

# Drop safely from feature list and training view
existing = [c for c in LEAKY_FEATURES if c in df_asof.columns]
missing  = sorted(set(LEAKY_FEATURES) - set(existing))

feature_cols = [c for c in feature_cols if c not in LEAKY_FEATURES]
df_asof = df_asof.drop(columns=LEAKY_FEATURES, errors="ignore").copy()

print(f"[leak-drop] dropped {len(existing)}/{len(LEAKY_FEATURES)} present → {existing}")
if missing:
    print("[leak-drop] not present (already absent):", missing)

# Safety
assert not (set(feature_cols) & set(LEAKY_FEATURES))
assert df_asof[feature_cols + [TARGET]].notna().all().all(), "Found NA after drop."

print(f"[asof] df_asof now: {df_asof.shape} | kept features: {len(feature_cols)}")
[leak-drop] dropped 8/8 present → ['Result_Detrended_14_ma', 'MA_Neg_Vol_255_ma', 'STARC_Bands_Bottom_STARC_Bands_15_5_13_y', 'STARC_Bands_Median_STARC_Bands_15_5_13_y', 'STARC_Bands_Top_STARC_Bands_15_5_13_y', 'Result_Hist_Vol_10_252_1', 'PMOSignal_PMO_35_20_10', 'Gator_13_8_8_5_5_3_hist1']
[asof] df_asof now: (92911, 78) | kept features: 73


4 - Walk-Forward Cross-Validation Windows¶

We implement walk-forward (expanding window) cross-validation, which is more realistic for time series than random splits.

Key points¶

  • Training window starts with a minimum of TRAIN_YEARS (default = 3 years).
  • Expands forward as time progresses (no shrinking).
  • Testing window = the next calendar month after the embargo period.
  • Embargo = EMBARGO_DAYS ensures no overlap between train and test (avoids leakage from target horizons).
  • Step size = STEP_MONTHS controls how far the training window advances each iteration (default = 1 month).

Outputs¶

  • WFWindow dataclass: stores train/test start and end dates.
  • wf_windows: full list of folds.
  • wf_windows_run: subset of folds (optionally truncated by N_FOLDS_LIMIT).
In [11]:
# Walk-forward windows
from dataclasses import dataclass
from dateutil.relativedelta import relativedelta

@dataclass
class WFWindow:
    train_start: pd.Timestamp
    train_end:   pd.Timestamp
    test_start:  pd.Timestamp
    test_end:    pd.Timestamp

def generate_wf_windows(dates: pd.DatetimeIndex,
                        train_years: int = TRAIN_YEARS,
                        step_months: int = STEP_MONTHS,
                        embargo_days: int = EMBARGO_DAYS):
    """
    Expanding training window; test on the next calendar month.
    Embargo between train_end and test_start to avoid look-ahead via target horizon.
    """
    dmin, dmax   = dates.min(), dates.max()
    start_train  = dmin
    start_end    = start_train + relativedelta(years=train_years) - pd.Timedelta(days=1)
    cursor       = start_end + pd.Timedelta(days=embargo_days)

    while cursor <= dmax - pd.tseries.offsets.MonthEnd(0):
        test_start = (cursor + pd.offsets.MonthBegin(0)).normalize()
        test_end   = (test_start + pd.offsets.MonthEnd(0))
        yield WFWindow(start_train, start_end, test_start, test_end)
        start_end += pd.offsets.MonthEnd(step_months)
        cursor     = start_end + pd.Timedelta(days=embargo_days)

 # Build windows on the AS-OF dataset 
unique_dates = pd.DatetimeIndex(sorted(df_asof.index.unique())).normalize()
wf_windows   = list(generate_wf_windows(unique_dates, TRAIN_YEARS, STEP_MONTHS, EMBARGO_DAYS))

N_FOLDS_LIMIT  = None
wf_windows_run = wf_windows if not N_FOLDS_LIMIT else wf_windows[:N_FOLDS_LIMIT]
print("WF folds:", len(wf_windows_run))
wf_windows_run[:3]
WF folds: 43
Out[11]:
[WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-06-13 00:00:00'), test_start=Timestamp('2021-08-01 00:00:00'), test_end=Timestamp('2021-08-31 00:00:00')),
 WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-06-30 00:00:00'), test_start=Timestamp('2021-08-01 00:00:00'), test_end=Timestamp('2021-08-31 00:00:00')),
 WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-07-31 00:00:00'), test_start=Timestamp('2021-09-01 00:00:00'), test_end=Timestamp('2021-09-30 00:00:00'))]

Walk-Forward Training & Prediction (AS-OF)¶

This section defines the train/test splitter, model pipelines, and two WF runners:

1) _split_wf(df_view, w, embargo_bd, asof_lag)¶

Creates train/test slices for one walk-forward window w:

  • Ceiling guard: training data stops strictly before the first test observation and before any info that could bleed due to the AS-OF lag (asof_lag) and horizon.
  • Embargo: removes the last embargo_bd business days from the effective training end to avoid leakage via target computation.

2) build_pipeline(model)¶

Builds a preprocessing + estimator pipeline:

  • Preprocessing: median imputation → standardization.
  • Models: "elasticnet" (with grid for alpha, l1_ratio) or "ridge" (grid for alpha).
  • Returns (pipeline, param_grid) for tuning.

3) fit_predict_wf_fast_asof(...)¶

Fast baseline using fixed Ridge(alpha) (no CV). For each fold:

  • Split with _split_wf → fit → predict on test month.
  • Returns a concatenated DataFrame with y_pred, ticker, wf_fold, model.

4) fit_predict_wf_asof(...)¶

Tuned variant with RandomizedSearchCV over the model grid using TimeSeriesSplit (3 splits) on the training chunk of each fold.

  • Scoring: neg MSE (minimizes MSE on the target forward return).
  • Progress bar shows n_iter × CV_splits tasks per fold.
  • Returns predictions with best-found hyperparameters per fold.

Notes

  • All inputs must already respect AS-OF lagging and prior filters (df_asof, feature_cols, TARGET).
  • Use wf_windows_run created earlier for consistent fold definitions.
  • For reproducibility, the seeds are fixed and preprocessing is identical across models.
In [12]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.model_selection import RandomizedSearchCV, TimeSeriesSplit
from pandas.tseries.offsets import BDay


def _split_wf(df_view, w, embargo_bd=MAX_H, asof_lag=ASOF_LAG_DAYS):
    # Ceiling: stop training before the first test observation and before target horizon
    ceiling = w.test_start - BDay(asof_lag + 1)         # nothing from (t-1) bleeding into test
    train_end_eff = min(w.train_end, ceiling) - BDay(max(1, embargo_bd))
    tr = df_view.loc[(df_view.index >= w.train_start) & (df_view.index <= train_end_eff)]
    te = df_view.loc[(df_view.index >= w.test_start)  & (df_view.index <= w.test_end)]
    return tr, te, train_end_eff

def build_pipeline(model: str = "elasticnet"):
    pre = Pipeline([
        ("impute", SimpleImputer(strategy="median")),
        ("scale",  StandardScaler(with_mean=True, with_std=True)),
    ])
    if model == "elasticnet":
        est = ElasticNet(random_state=RANDOM_STATE, max_iter=2000)
        params = {
            "est__alpha":    np.logspace(-4, 1, 50),
            "est__l1_ratio": np.linspace(0.0, 1.0, 21),
        }
    elif model == "ridge":
        est = Ridge(random_state=RANDOM_STATE)
        params = {"est__alpha": np.logspace(-4, 2, 20)}
    else:
        raise ValueError("model must be 'elasticnet' or 'ridge'")
    return Pipeline([("pre", pre), ("est", est)]), params

def fit_predict_wf_fast_asof(df_view: pd.DataFrame,
                             feature_cols: list,
                             target_col: str,
                             windows=None,
                             alpha: float = 1.0) -> pd.DataFrame:
    if windows is None: windows = wf_windows_run
    preds, pipe = [], Pipeline([
        ("impute", SimpleImputer(strategy="median")),
        ("scale",  StandardScaler()),
        ("est",    Ridge(alpha=alpha, random_state=RANDOM_STATE)),
    ])
    for k, w in enumerate(progress(windows, total=len(windows),
                                   desc=f"WF ridge_fast [{target_col}]"), 1):

        tr, te, tr_end_eff = _split_wf(df_view, w)
        if te.empty: 
            print(f"Fold {k:02d} skipped (te=0)"); 
            continue
        tr = tr.dropna(subset=[target_col])
        X_tr, y_tr = tr[feature_cols], tr[target_col]
        X_te       = te[feature_cols]
        if len(X_tr)==0 or len(X_te)==0:
            print(f"Fold {k:02d} skipped (tr={len(X_tr)}, te={len(X_te)})")
            continue
        pipe.fit(X_tr, y_tr)
        yhat = pipe.predict(X_te)
        out = pd.DataFrame(
            {"ticker": te["ticker"].values, "y_pred": yhat,
             "wf_fold": k, "model": "ridge_fast"},
            index=te.index
        ); out.index.name = "date"
        preds.append(out)
        print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
              f"| Test {w.test_start.date()}→{w.test_end.date()} | est={pipe.named_steps['est']}")
    return pd.concat(preds).sort_index()

def fit_predict_wf_asof(df_view: pd.DataFrame,
                        feature_cols: list,
                        target_col: str,
                        model: str = "elasticnet",
                        n_iter: int = 25,
                        windows=None) -> pd.DataFrame:
    if windows is None: windows = wf_windows_run
    pipe, params = build_pipeline(model)
    preds = []
    for k, w in enumerate(progress(windows, total=len(windows),
                                  desc=f"WF {model} [n_iter={n_iter}]"), 1):
        tr, te, tr_end_eff = _split_wf(df_view, w)
        if te.empty:
            print(f"Fold {k:02d} skipped (te=0)"); 
            continue
        tr = tr.dropna(subset=[target_col])
        X_tr, y_tr = tr[feature_cols], tr[target_col]
        X_te       = te[feature_cols]
        cv = TimeSeriesSplit(n_splits=3)
        tuner = RandomizedSearchCV(
        pipe, params, n_iter=n_iter, random_state=RANDOM_STATE,
        n_jobs=-1, cv=cv,
        scoring="neg_mean_squared_error", verbose=0
        )
        
        total_tasks = n_iter * cv.get_n_splits()
        with joblib_progress(total_tasks, desc=f"RS[{model}] fold {k:02d}"):
            tuner.fit(X_tr, y_tr)
        out = te[["ticker"]].copy()
        out["y_pred"]  = tuner.predict(X_te)
        out["wf_fold"] = k
        out["model"]   = model
        out.index.name = "date"
        preds.append(out)
        print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
              f"| Test {w.test_start.date()}→{w.test_end.date()} | best={tuner.best_params_}")
    return pd.concat(preds).sort_index()


5 - Portfolio Construction & Backtest Engine¶

This section converts model predictions into portfolio weights, simulates trades, and evaluates performance.

Components¶

  • _cap_and_renorm(row, cap, long_short)
    Applies a per-name weight cap and renormalizes weights:

    • Long/short: cap on both sides, then rescale each side to 50% gross.
    • Long-only: cap, then scale down so total gross ≤ 1.
  • make_positions(signal_df, ...)
    Converts daily model scores (y_pred) into cross-sectional weights:

    1. Normalize scores to a daily matrix.
    2. Resample at rebalance frequency (weekly/monthly).
    3. Rank scores → take top q fraction (and bottom if long/short).
    4. Assign equal weights per side, adjusted for leverage, eligibility masks, and per-name caps.
  • compute_returns_matrix(d)
    Converts (date, ticker, close) into a date × ticker matrix of simple returns, de-duplicating per (date,ticker).

  • backtest(signal_pred, prices_df, ...)
    Simulates trading given predictions and prices:

    • Build rebalance-day positions with make_positions.
    • Forward-fill to daily weights, then lag one day (trade at next day’s open).
    • Approximate multi-day holds with rolling mean of weights.
    • Compute turnover and apply transaction costs.
    • Output daily portfolio returns, turnover, and cumulative equity curve.
  • perf_stats(ts, freq)
    Computes key performance metrics from a return series:

    • CAGR, annualized volatility, Sharpe ratio, max drawdown, sample size.

Notes¶

  • This design enforces as-of trading discipline: trade one day after signal, no lookahead.
  • Turnover-based costs ensure realistic performance attribution.
  • Weights respect liquidity/eligibility and per-name caps from earlier toggles.
In [13]:
def _cap_and_renorm(row: pd.Series, cap: float, long_short: bool) -> pd.Series:
    if cap is None or cap <= 0:
        return row

    w = row.copy()

    if long_short:
        # 1) hard box cap on both sides
        w = w.clip(lower=-cap, upper=cap)

        # 2) side-wise sums after capping
        pos_sum = w[w > 0].sum()
        neg_sum = -w[w < 0].sum()

        # 3) scale DOWN only, never up (keeps the cap hard)
        target_side = 0.5
        if pos_sum > 0:
            w.loc[w > 0] *= min(target_side / pos_sum, 1.0)
        if neg_sum > 0:
            w.loc[w < 0] *= min(target_side / neg_sum, 1.0)

        return w

    else:
        # long-only: hard cap then scale DOWN to gross=1
        w = w.clip(lower=0.0, upper=cap)
        gross = w.sum()
        if gross > 0:
            w *= min(1.0 / gross, 1.0)
        return w


def make_positions(
    signal_df: pd.DataFrame,
    long_short: bool = LONG_SHORT,
    top_q: float = TOP_QUANTILE,
    freq: str = REBALANCE_FREQ,
    gross_leverage: float = 1.0,
    eligibility: pd.DataFrame | None = None,
    per_name_cap: float | None = None,
    debug: bool = False
) -> pd.DataFrame:
    """
    Build cross-sectional weights from daily scores without backfilling future signals.
    Optionally enforce ADV eligibility per date.
    """
    if per_name_cap is None:
        per_name_cap = (globals().get('PER_NAME_CAP') 
                        if globals().get('USE_PER_NAME_CAP', False) else None)
    
    sig = (signal_df[['ticker', 'y_pred']]
           .rename(columns={'y_pred': 'score'})
           .copy())
    if getattr(sig.index, "tz", None) is not None:
        sig.index = sig.index.tz_localize(None)
    sig.index = pd.to_datetime(sig.index).normalize()
    sig = sig.dropna(subset=['score'])

    if debug:
        print("[make_positions] pred rows:", len(sig),
              "| unique dates:", sig.index.nunique(),
              "| range:", sig.index.min(), "->", sig.index.max(),
              "| tickers:", sig['ticker'].nunique())

    scores_daily = (sig.reset_index(names='date')
                      .sort_values(['date','ticker'])
                      .drop_duplicates(['date','ticker'], keep='last')
                      .pivot(index='date', columns='ticker', values='score')
                      .astype(float)
                      .sort_index())

    scores = scores_daily.resample(freq).last().dropna(how="all")

    if debug:
        valid_days = int((scores.notna().sum(axis=1) > 0).sum())
        print("[make_positions] scores shape:", scores.shape,
              "| valid rebalance days:", valid_days)

    # ranks (1=best); build weights with optional eligibility mask
    w = pd.DataFrame(0.0, index=scores.index, columns=scores.columns)
    for dt, row in scores.iterrows():
        r = row.dropna()
        if eligibility is not None and dt in eligibility.index:
            elig_row = eligibility.loc[dt].reindex(r.index)
            r = r[elig_row.fillna(False)]
        if len(r) == 0:
            continue
        k = max(1, int(round(len(r) * top_q)))
        ranks = r.rank(ascending=False, method='first')
        top = ranks.nsmallest(k).index
        if long_short:
            bot = ranks.nlargest(k).index
            w.loc[dt, top] = +gross_leverage / (2*k)
            w.loc[dt, bot] = -gross_leverage / (2*k)
        else:
            w.loc[dt, top] = gross_leverage / k
            
        if per_name_cap is not None and per_name_cap > 0:
            w.loc[dt] = _cap_and_renorm(w.loc[dt], per_name_cap, long_short)    
    return w




def compute_returns_matrix(d: pd.DataFrame) -> pd.DataFrame:
    """
    Convert (date, ticker, close) rows into a date x ticker matrix of SIMPLE returns.
    De-duplicates (date,ticker) by taking the last row.
    """
    x = (d.reset_index(names='date')
           .sort_values(['date','ticker'])
           .drop_duplicates(['date','ticker'], keep='last'))
    px = x.pivot(index='date', columns='ticker', values='close')
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()
    rets = px.pct_change().fillna(0.0)  # simple returns
    return rets.sort_index()


def backtest(
    signal_pred: pd.DataFrame,
    prices_df: pd.DataFrame,
    long_short: bool = LONG_SHORT,
    top_q: float = TOP_QUANTILE,
    freq: str = REBALANCE_FREQ,
    holding_period: int = HOLDING_PERIOD,
    tcost_bps: float = TCOST_BPS,
    eligibility: pd.DataFrame | None = None,
    per_name_cap: float | None = None,
    gross_leverage: float = 1.0, 
    debug: bool = False
):
    """
    Align predictions to daily returns, trade next day, and hold each rebalance for
    `holding_period` days via rolling mean of weights (staggered overlap).
    """
    # positions on rebalance calendar
    w_rb = make_positions(signal_pred, long_short=long_short, top_q=top_q,
                          freq=freq, gross_leverage=gross_leverage, eligibility=eligibility, 
                          per_name_cap=per_name_cap, debug=debug)
    # daily returns matrix
    rets = compute_returns_matrix(prices_df)

    # align: forward-fill positions to daily dates, then select only return dates
    weights = (w_rb
               .reindex(w_rb.index.union(rets.index))
               .sort_index()
               .ffill()
               .reindex(rets.index))

    # common universe
    common = rets.columns.intersection(weights.columns)
    if debug:
        print("[backtest] columns | rets:", len(rets.columns),
              "weights:", len(weights.columns), "common:", len(common))
        print("[backtest] weights_w first date:", w_rb.index.min(),
              "| after-align nonzero rows:",
              int((weights[common].abs().sum(axis=1) > 1e-12).sum()))
        if w_rb.index.min() in weights.index:
            print("[backtest] sum |w| at first rebalance:",
                  float(weights.loc[w_rb.index.min(), common].abs().sum()))

    weights = weights[common].fillna(0.0)
    rets    = rets[common].fillna(0.0)

    # enter next day; approximate H-day hold with rolling mean
    w_held = weights.shift(1).rolling(holding_period).mean().fillna(0.0)

    # mask out pre-deployment days (before first nonzero)
    mask_live = (w_held.abs().sum(axis=1) > 1e-12)
    if mask_live.any():
        first_live = mask_live.idxmax()  # first True
        w_held = w_held.loc[first_live:]
        rets   = rets.loc[first_live:]

    # turnover & costs
    turnover = w_held.sub(w_held.shift(1).fillna(0.0)).abs().sum(axis=1)
    cost = turnover * (tcost_bps / 1e4)

    # portfolio simple return
    port_ret = (w_held * rets).sum(axis=1) - cost
    equity   = (1.0 + port_ret.fillna(0.0)).cumprod()

    out = pd.DataFrame({'ret': port_ret, 'turnover': turnover, 'equity': equity})
    out.index.name = 'date'
    return out, w_held


def perf_stats(ts: pd.Series, freq: int = 252) -> dict:
    r = ts.dropna()
    if len(r) == 0:
        return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0)
    cagr   = (1 + r).prod()**(freq/len(r)) - 1
    vol    = r.std() * np.sqrt(freq)
    sharpe = (r.mean() / r.std()) * np.sqrt(freq) if r.std() > 0 else np.nan
    curve  = (1 + r).cumprod()
    maxdd  = (curve / curve.cummax() - 1).min()
    return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=maxdd, N=len(r))

Risk-Control Helpers: Liquidity Eligibility & Beta Neutralization¶

This section builds two overlays that can be applied during portfolio construction.

1) build_adv_eligibility(df_prices, lookback=60, pct=0.20)¶

Computes an ADV-based eligibility mask (date × ticker → boolean):

  • ADV ≈ rolling mean of close × volume over lookback days.
  • Leak-safe: uses shift(1) so day-t eligibility never uses day-t information.
  • For each date, marks tickers with ADV ≥ cross-sectional pct percentile (default top 20% liquid) as eligible.

2) build_rolling_beta(df_prices, lookback=252)¶

Computes rolling betas of each ticker to an equal-weight market return:

  • Market proxy: cross-sectional mean of simple returns each day.
  • Beta per ticker via rolling covariance/variance over lookback.
  • Leak-safe: shift(1) to use info only up to t-1.

3) neutralize_vs_beta(pred_df, betas, min_names, strength)¶

Removes unintended market beta exposure from daily scores:

  • For each date, regress scores (y_pred) on betas cross-sectionally and take residuals.
  • min_names guards against thin cross-sections; strength (0..1) scales the neutralization (1 = full).
  • Demeans both variables per date; ridge term (RIDGE_EPS) stabilizes low-variance cases.
  • Output preserves (date, ticker, y_pred) with residualized scores.

Usage notes

  • Apply eligibility mask inside make_positions(...) to skip illiquid names.
  • Apply neutralize_vs_beta(...) to pred_df before turning scores into weights if USE_BETA_NEUTRAL=True.
In [14]:
# --- Risk-control helpers (ADV eligibility & beta-neutral overlay) ---

def build_adv_eligibility(df_prices: pd.DataFrame,
                          lookback: int = 60,
                          pct: float = 0.20) -> pd.DataFrame:
    """
    Returns a boolean DataFrame (date x ticker) marking names eligible by ADV percentile.
    Leak-safe: uses rolling mean of (close*volume) shifted by 1 day.
    """
    d = (df_prices.reset_index(names='date')
                  .drop_duplicates(['date','ticker'], keep='last')
                  .pivot(index='date', columns='ticker', values='close'))
    v = (df_prices.reset_index(names='date')
                  .drop_duplicates(['date','ticker'], keep='last')
                  .pivot(index='date', columns='ticker', values='volume'))
    dv  = (d * v).astype(float)
    adv = dv.rolling(lookback).mean().shift(1)  # no day-of info

    # per-date threshold at the requested percentile
    thr = adv.apply(lambda row: np.nanpercentile(row.values, pct*100), axis=1)
    elig = adv.ge(thr, axis=0) & adv.notna()
    elig.index.name = 'date'
    return elig

def build_rolling_beta(df_prices: pd.DataFrame,
                       lookback: int = 252) -> pd.DataFrame:
    """
    Rolling beta vs equal-weight market return. Leak-safe via shift(1).
    Returns DataFrame (date x ticker) of betas.
    """
    px = (df_prices.reset_index(names='date')
                     .drop_duplicates(['date','ticker'], keep='last')
                     .pivot(index='date', columns='ticker', values='close'))
    rets = px.pct_change()
    mkt  = rets.mean(axis=1)  # equal-weight market
    var_m = mkt.rolling(lookback).var()
    betas = pd.DataFrame(index=rets.index, columns=rets.columns, dtype=float)

    # rolling covariance per column vs market
    for c in rets.columns:
        cov_im = rets[c].rolling(lookback).cov(mkt)
        betas[c] = cov_im / (var_m + RIDGE_EPS)

    betas = betas.shift(1)  # use only info up to t-1
    betas.index.name = 'date'
    return betas

def neutralize_vs_beta(pred_df: pd.DataFrame,
                       betas: pd.DataFrame,
                       min_names: int = MIN_XS_NAMES,
                       strength: float = 1.0) -> pd.DataFrame:
    """
    Per-date cross-sectional regression of scores on beta; returns residualized scores.
    Handles duplicate (date,ticker) by keeping the last one.
    """
    s_tidy = (pred_df.reset_index().rename(columns={'index':'date'})
              .sort_values(['date','ticker'])
              .drop_duplicates(['date','ticker'], keep='last'))
    s = s_tidy.pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last')

    # align betas to score matrix
    b = betas.reindex(s.index).reindex(columns=s.columns)

    def _resid_row(y, x):
        y = y.astype(float).copy()
        x = x.astype(float).copy()
        m = np.isfinite(y) & np.isfinite(x)
        if m.sum() < min_names:
            return y * np.nan
        # demean both; if beta variance is ~0, just return demeaned y
        y0 = y[m] - y[m].mean()
        x0 = x[m] - x[m].mean()
        denom = float(np.dot(x0, x0))
        if denom <= RIDGE_EPS:
            y_res = y.copy()
            y_res[m] = y0
            return y_res
        beta = float(np.dot(x0, y0) / (denom + RIDGE_EPS))
        y_res = y.copy()
        y_res[m] = y0 - strength * beta * x0
        return y_res

    resid = s.combine(b, func=_resid_row, overwrite=False)

    out = (resid.stack(dropna=False)
                 .rename('y_pred')
                 .reset_index()
                 .dropna(subset=['y_pred']))
    return out.set_index('date')[['ticker','y_pred']]


6 - Lag-Ensemble (Train & Test) + Horizon Ensemble¶

This section trains models on walk-forward training sets and evaluates them on test folds, then combines predictions across multiple AS-OF lags (and optionally horizons).

Key steps¶

  1. Train–test loop

    • For each walk-forward window (wf_windows_run):
      • Train the model pipeline on the training slice.
      • Test by predicting on the next month’s test slice.
    • This is repeated independently for each lag (1D, 3D, 5D, 10D).
  2. Lag ensemble

    • Collect predictions from all lags per (date, ticker).
    • Weighting:
      • Train-only IC weights: average daily IC on the training span only, shrunk toward equal weights by IC_SHRINK_LAMBDA.
      • Or fallback: equal weights, or test-set IC weighting if enabled.
  3. (Optional) Horizon ensemble

    • Repeat the train–test lag-ensemble procedure for each horizon (5D, 21D).
    • Combine horizon-level predictions using train-only IC weights (HORIZON_IC_SHRINK_LAMBDA).
  4. Residualization vs 1D return

    • On the combined test predictions, regress out exposure to prior 1-day returns per date.
    • Output: final test-set prediction series PRED_USE with (date, ticker, y_pred).

Why this matters¶

  • Ensures a clean separation between training and testing — no information leakage.
  • Lag ensemble reduces timing risk; train-only IC weighting keeps it out-of-sample.
  • Residualization vs 1D return avoids overfitting to trivial short-term patterns.
In [15]:
from functools import reduce

# --- Train & predict: lag-ensemble + neutralization -------------------------
USE_TRAIN_IC_WEIGHTS   = True   # default off
IC_SHRINK_LAMBDA       = 0.6     # 0=pure IC weights, 1=pure equal weights

USE_FAST = True
TARGET   = f"fwd_ret_{TARGET_HORIZONS[0]}d"

# As-of lags in TRADING days (1 = current baseline).
# LAG_ENS = [1, 3, 5, 10]

LAG_ENS = [1, 3]

# weighting scheme for the ensemble: "equal" or "ic"
# ("ic" will try to compute mean IC per-lag; if compute_rank_ic isn't defined yet,
# it will auto-fallback to equal weights)
WEIGHT_METHOD = "equal"

# --- Horizon ensemble toggles ---
USE_HORIZON_ENSEMBLE   = False    # <— blend 5D + 21D if True
HORIZON_LIST           = [5, 21] # must be subset of TARGET_HORIZONS
HORIZON_IC_SHRINK_LAMBDA = 0.6   # 0 = pure train-IC weights, 1 = equal weights

# --- Model variant toggles ----------------------------------------------------
MODEL_KIND   =  "ridge"      # {"ridge","elasticnet","huber","hgbt"}  (hgbt = HistGBR)
MODEL_PARAMS = {}           # per-model overrides; leave {} for defaults


# default params (used if key missing in MODEL_PARAMS)
MODEL_DEFAULTS = {
    "ridge":      {"alpha": 1.0},
    "elasticnet": {"alpha": 0.5, "l1_ratio": 0.2, "max_iter": 5000},
    "huber":      {"epsilon": 1.35, "alpha": 1e-4, "max_iter": 1000},
    "hgbt":       {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250,
                   "l2_regularization": 0.0, "max_leaf_nodes": 31}
}


def _make_pipeline(kind: str, params: dict):
    p = (MODEL_DEFAULTS.get(kind, {}) | (params or {}))
    steps = [("impute", SimpleImputer(strategy="median"))]

    if kind in {"ridge","elasticnet","huber"}:
        steps.append(("scale", StandardScaler()))

    if kind == "ridge":
        est = Ridge(alpha=float(p["alpha"]), random_state=RANDOM_STATE)
    elif kind == "elasticnet":
        est = ElasticNet(alpha=float(p["alpha"]),
                         l1_ratio=float(p["l1_ratio"]),
                         max_iter=int(p["max_iter"]),
                         random_state=RANDOM_STATE)
    elif kind == "huber":
        est = HuberRegressor(epsilon=float(p["epsilon"]),
                             alpha=float(p["alpha"]),
                             max_iter=int(p["max_iter"]))
    elif kind == "hgbt":
        est = HistGradientBoostingRegressor(
            max_depth=int(p["max_depth"]),
            learning_rate=float(p["learning_rate"]),
            max_iter=int(p["max_iter"]),
            l2_regularization=float(p["l2_regularization"]),
            max_leaf_nodes=int(p["max_leaf_nodes"]),
            random_state=RANDOM_STATE)
    else:
        raise ValueError(f"Unknown MODEL_KIND={kind}")

    return Pipeline(steps + [("est", est)])


def fit_predict_wf_with_pipe(df_asof, feature_cols, target_col, windows, pipe) -> pd.DataFrame:
    """
    Run your existing walk-forward windows with a custom sklearn Pipeline.

    This is robust to different _split_wf return signatures by:
    - calling _split_wf(df_asof, w)
    - extracting the first and last DataFrame-like objects as train/test
    """
    out = []

    for w in windows:
        parts = _split_wf(df_asof, w)

        dfs = [p for p in parts if isinstance(p, pd.DataFrame)]
        if not dfs:
            raise ValueError(f"_split_wf returned no DataFrames for window {w!r}: {type(parts)=}")

        # first DF is train, last DF is test
        tr_df = dfs[0]
        te_df = dfs[-1] if len(dfs) >= 2 else None

        if te_df is None or getattr(te_df, "empty", True):
            continue

        pp = clone(pipe)
        pp.fit(tr_df[feature_cols], tr_df[target_col])
        yhat = pp.predict(te_df[feature_cols])

        fold = pd.DataFrame(
            {"ticker": te_df["ticker"].values, "y_pred": yhat},
            index=te_df.index
        )
        fold.index.name = "date"
        out.append(fold)

    if not out:
        return pd.DataFrame(columns=["ticker","y_pred"])

    return (pd.concat(out).sort_index()
            .reset_index().set_index("date")[["ticker","y_pred"]].sort_index())


def _compute_rank_ic_quick(pred_df: pd.DataFrame,
                           df_prices: pd.DataFrame,
                           horizon: int = 5,
                           min_names: int = 10,
                           demean: bool = True) -> pd.Series:
    # predictions → tidy
    p = (pred_df[['ticker','y_pred']]
         .rename(columns={'y_pred':'score'})
         .rename_axis('date').reset_index())
    p['date'] = pd.to_datetime(p['date']).dt.normalize()
    p = p[['date','ticker','score']]

    # prices → wide, normalized index
    px = (df_prices.reset_index(names='date')
            .sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .pivot(index='date', columns='ticker', values='close')
            .astype(float).sort_index())

    # realized forward simple returns
    if horizon > 0:
        fwd = px.pct_change(horizon).shift(-horizon)
    elif horizon < 0:
        H = abs(horizon); fwd = px.pct_change(H)
    else:
        fwd = px * 0.0

    real = (fwd.stack().rename('fwd')
              .reset_index().rename(columns={'level_1':'ticker'}))

    ic_df = p.merge(real, on=['date','ticker'], how='left')

    if demean:
        ic_df['fwd']   = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
        ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())

    def _daily_ic(g):
        g = g.dropna(subset=['score','fwd'])
        if len(g) < min_names: return np.nan
        return g[['score','fwd']].corr(method='spearman').iloc[0,1]

    ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
    ic.index.name = 'date'
    ic.name = f'IC_{horizon}d'
    return ic


def _asof_extra_lag(df_base, cols, extra):
    """Apply an extra per-ticker shift to features only (keeps TARGET intact)."""
    if extra <= 0:
        return df_base
    d = df_base.copy()
    d[cols] = d.groupby('ticker', group_keys=False)[cols].shift(extra)
    return d.dropna(subset=cols + [TARGET])


def _neutralize_vs_ret1_quick(pred_df, df_prices):
    ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
    ret1 = (df_prices.assign(ret_1d=ret1)
            .assign(date=lambda x: x.index)
            .reset_index(drop=True)[['date','ticker','ret_1d']])
    sc = pred_df.reset_index().rename(columns={'index':'date'})
    m  = sc.merge(ret1, on=['date','ticker'], how='left')
    def _res(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        mask = np.isfinite(x) & np.isfinite(y)
        if mask.sum() < 10: g['y_pred'] = np.nan; return g
        beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
        g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
        return g
    return (m.groupby('date', group_keys=False).apply(_res)
              .dropna(subset=['y_pred'])
              .set_index('date')[['ticker','y_pred']])

def compute_train_ic_weights(df_asof, feature_cols, target_col, lags, df_prices,
                             use_fast=True, alpha=1.0,
                             neutralize_ret1=True, shrink_lambda=0.5,
                             model_kind: str = None, model_params: dict | None = None):

    # build train window = expanding start .. first fold's effective train_end
    first_w = wf_windows_run[0]
    tr, _, _ = _split_wf(df_asof, first_w)
    if tr.empty:
        return {L: 1.0/len(lags) for L in lags}

    # fit once per lag on TRAIN ONLY, score train, neutralize, compute daily Spearman IC
    ic_means = {}
    px_wide = (df_prices.reset_index(names='date')
             .sort_values(['date','ticker'])
             .drop_duplicates(['date','ticker'], keep='last')
             .pivot(index='date', columns='ticker', values='close')
             .astype(float)
             .sort_index())

    for L in progress(lags, total=len(lags), desc="Train IC by lag"):
        dL = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
        dL = dL.sort_index()  # safe no-op if already sorted
        tr_start, tr_end = tr.index.min(), tr.index.max()
        trL = dL[(dL.index >= tr_start) & (dL.index <= tr_end)]
        if trL.empty:
            ic_means[L] = 0.0
            continue

        pipe = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)


        X_tr, y_tr = trL[feature_cols], trL[target_col]
        pipe.fit(X_tr, y_tr)
        yhat = pipe.predict(X_tr)

        pred = pd.DataFrame({"ticker": trL["ticker"].values, "y_pred": yhat}, index=trL.index)
        pred.index.name = "date"
        if neutralize_ret1:
            pred = _neutralize_vs_ret1_quick(pred, df_prices)

        # realized fwd simple returns on train
        H = int(target_col.split('_')[2].rstrip('d')) 
        if pred.empty:
            ic_means[L] = 0.0
            continue

        # ensure pred dates are normalized
        pred.index = pd.to_datetime(pred.index).normalize()

        start, end = pred.index.min(), pred.index.max()
        px_tr = px_wide[(px_wide.index >= start) & (px_wide.index <= end)]

        fwd = px_tr.pct_change(H).shift(-H)

        # tidy merge
        sc   = pred.reset_index().rename(columns={'index':'date'})
        real = (fwd.stack().rename('fwd')
                    .reset_index()
                    .rename(columns={'level_1': 'ticker'}))
        ic_df = sc.merge(real, on=['date','ticker'], how='left')


        def _daily_ic(g):
            g = g.dropna(subset=['y_pred','fwd'])
            if len(g) < 10: return np.nan
            return g[['y_pred','fwd']].corr(method='spearman').iloc[0,1]

        ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
        ic_means[L] = float(ic.mean()) if len(ic) else 0.0

    w = np.array([max(ic_means[L], 0.0) for L in lags], float)
    if w.sum() == 0: w[:] = 1.0
    w /= w.sum()

    eq = np.ones_like(w) / len(lags)
    w_final = (1 - shrink_lambda)*w + shrink_lambda*eq
    return dict(zip(lags, w_final))


def _cs_zscore_by_date(df_scores: pd.DataFrame) -> pd.DataFrame:
    """
    Cross-sectional z-score per date on column 'y_pred'.
    Input: index=date, cols=['ticker','y_pred']
    Output: same schema with z-scored y_pred.
    """
    def _z(g):
        s = g['y_pred'].astype(float)
        sd = float(s.std(ddof=0))
        if not np.isfinite(sd) or sd <= 1e-12:
            return pd.Series(0.0, index=s.index)  
        return (s - float(s.mean())) / sd
    return df_scores.groupby(level=0, group_keys=False).apply(lambda g: g.assign(y_pred=_z(g)))

def _make_df_asof_for_h(df_asof_base: pd.DataFrame, target_h: int) -> pd.DataFrame:
    """
    Reuse your existing df_asof (features already shifted 1D, leaky features dropped),
    but keep only rows where the H-target exists.
    """
    tcol = f"fwd_ret_{target_h}d"
    assert tcol in df_asof_base.columns, f"Missing target {tcol} in df_asof"
    return df_asof_base.dropna(subset=[tcol])

def _fitpredict_one_horizon(target_h: int,
                            model_kind: str = None,
                            model_params: dict | None = None
                           ) -> tuple[dict, pd.DataFrame, pd.DataFrame]:
    """
    Build the lag-ensemble for a given horizon H (5 or 21):
      - per-lag predictions via your WF runner
      - train-only IC weights across lags (if enabled)
      - per-date neutralization vs 1D return
    Returns:
      pred_by_lag_H : {L: DataFrame(date,[ticker,y_pred_L{L}])}
      pred_ens_raw  : DataFrame(date,[ticker, y_pred])  (pre-neutralization)
      PRED_BASE_H   : DataFrame(date,[ticker, y_pred])  (post 1D-neutralization)
    """
    tcol = f"fwd_ret_{target_h}d"
    dfH  = _make_df_asof_for_h(df_asof, target_h)

    # 1) per-lag predictions
    pred_by_lag_H = {}
    for L in progress(LAG_ENS, total=len(LAG_ENS), desc=f"Lags (H={target_h}d)"):
        dfL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
        pipeH = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)
        pL = fit_predict_wf_with_pipe(dfL, feature_cols, tcol, wf_windows_run, pipeH)

        pL = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
        pred_by_lag_H[L] = pL[['ticker', f'y_pred_L{L}']]

    # 2) align and weight across lags (train-only IC if enabled)
    dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag_H.values()]
    pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
    if USE_TRAIN_IC_WEIGHTS:
        W_lag = compute_train_ic_weights(
        df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
        use_fast=True, alpha=1.0, neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
        model_kind=(model_kind or MODEL_KIND), model_params=(model_params or MODEL_PARAMS)
    )

    else:
        W_lag = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

    pred_ens['y_pred'] = 0.0
    for L in LAG_ENS:
        pred_ens['y_pred'] += float(W_lag[L]) * pred_ens[f'y_pred_L{L}']

    pred_ens_raw = (pred_ens[['y_pred']]
                    .reset_index()
                    .set_index('date')[['ticker','y_pred']]
                    .sort_index())

    # 3) neutralize vs prior 1D return (same as 5D path for consistency)
    predN = (pred_ens_raw.reset_index().rename(columns={'index':'date'})
             .merge(
                 df.assign(ret_1d=df.groupby('ticker')['close'].pct_change(1))\
                   .assign(date=lambda x: x.index)\
                   .reset_index(drop=True)[['date','ticker','ret_1d']],
                 on=['date','ticker'], how='left'))

    def _res(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        m = np.isfinite(x) & np.isfinite(y)
        if m.sum() < 10: g['y_pred'] = np.nan; return g
        beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
        g.loc[m, 'y_pred'] = y[m] - beta * x[m]
        return g

    PRED_BASE_H = (predN.groupby('date', group_keys=False).apply(_res)
                   .dropna(subset=['y_pred'])
                   .set_index('date')[['ticker','y_pred']])

    # Dedup safety
    PRED_BASE_H = (PRED_BASE_H.reset_index().sort_values(['date','ticker'])
                   .drop_duplicates(['date','ticker'], keep='last')
                   .set_index('date')[['ticker','y_pred']])

    return pred_by_lag_H, pred_ens_raw, PRED_BASE_H



# 1) fit/predict for each as-of lag
pred_by_lag = {}
for L in progress(LAG_ENS, total=len(LAG_ENS), desc="Lag ensemble (main)"):
    dfL   = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
    pipe0 = _make_pipeline(MODEL_KIND, MODEL_PARAMS)
    pL    = fit_predict_wf_with_pipe(dfL, feature_cols, TARGET, wf_windows_run, pipe0)
    pL    = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
    pred_by_lag[L] = pL[['ticker', f'y_pred_L{L}']]
    if USE_TQDM and '_tqdm' in globals() and _tqdm is not None:
        _tqdm.write(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")
    else:
        print(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")


# 2) align all lag predictions on (date, ticker)
dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag.values()]  # -> MultiIndex (date,ticker)
pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
lag_cols = [f'y_pred_L{L}' for L in LAG_ENS]

# 3) weights
if USE_TRAIN_IC_WEIGHTS:
    WEIGHTS = compute_train_ic_weights(
    df_asof, feature_cols, TARGET, LAG_ENS, df,
    neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
    model_kind=MODEL_KIND, model_params=MODEL_PARAMS
    )
    print("[ens] Train-only IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})

elif WEIGHT_METHOD == "ic" and 'compute_rank_ic' in globals():
    try:
        # original test-set IC weighting path
        px_tidy = df[['ticker','close']].copy()
        if getattr(px_tidy.index, "tz", None) is not None:
            px_tidy.index = px_tidy.index.tz_localize(None)
        px_tidy.index = pd.to_datetime(px_tidy.index).normalize()
        ic_means = {}
        for L in LAG_ENS:
            tmp = (pred_ens[[f'y_pred_L{L}']]
                   .rename(columns={f'y_pred_L{L}':'y_pred'})
                   .reset_index(level=1)
                   .rename_axis('date'))
            ic = compute_rank_ic(tmp, px_tidy, horizon=TARGET_HORIZONS[0], min_names=10, demean=True)
            ic_means[L] = float(ic.mean()) if len(ic) else 0.0
        w = np.array([max(ic_means[L], 0.0) for L in LAG_ENS], float)
        if w.sum() == 0: w[:] = 1.0
        w = w / w.sum()
        WEIGHTS = dict(zip(LAG_ENS, w))
        print("[ens] IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})
    except Exception as e:
        print(f"[ens] IC weighting failed ({e}); using equal weights.")
        WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

else:
    WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

# 4) weighted average prediction
pred_ens['y_pred'] = 0.0
for L in LAG_ENS:
    pred_ens['y_pred'] += WEIGHTS[L] * pred_ens[f'y_pred_L{L}']

# reshape back to expected schema: index=date, columns ['ticker','y_pred']
pred_ens = (pred_ens[['y_pred']]
            .reset_index()
            .set_index('date')[['ticker','y_pred']]
            .sort_index())

print("Pred-ensemble rows:", len(pred_ens),
      "| unique dates:", pred_ens.index.nunique(),
      "| tickers:", pred_ens['ticker'].nunique())

# 5) per-date neutralization vs prior 1D return
ret1 = (df[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d'))
ret1 = (df.assign(ret_1d=ret1)
          .assign(date=lambda x: x.index)
          .reset_index(drop=True)[['date','ticker','ret_1d']])

predN = (pred_ens.reset_index().rename(columns={'index':'date'})
         .merge(ret1, on=['date','ticker'], how='left'))

def _resid(g):
    x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
    m = np.isfinite(x) & np.isfinite(y)
    if m.sum() < 10:
        g['y_pred'] = np.nan
        return g
    beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
    g.loc[m, 'y_pred'] = y[m] - beta * x[m]
    return g

PRED_USE = (predN.groupby('date', group_keys=False).apply(_resid)
            .dropna(subset=['y_pred'])
            .set_index('date')[['ticker','y_pred']])
Lag ensemble (main):   0%|                                                                       | 0/2 [00:00<…
[ens] L=1 | rows=50766 | dates=854 | span=2021-08-01 00:00:00→2024-12-22 00:00:00
[ens] L=3 | rows=50760 | dates=854 | span=2021-08-01 00:00:00→2024-12-22 00:00:00
Train IC by lag:   0%|                                                                           | 0/2 [00:00<…
[ens] Train-only IC weights: {1: 0.5134, 3: 0.4866}
Pred-ensemble rows: 53574 | unique dates: 854 | tickers: 60

Horizon Ensemble (Train & Test)¶

This section extends the lag-ensemble to blend multiple target horizons (5D and 21D in this case).
Again, all weights are learned only on the training data of the first fold and applied to the test folds.

Key steps¶

  1. Per-horizon lag-ensembles

    • Run _fitpredict_one_horizon(5) and _fitpredict_one_horizon(21).
    • Each call internally:
      • Fits models on training windows (walk-forward).
      • Predicts on test folds.
      • Produces lag-ensemble predictions (PRED_BASE_H) neutralized vs prior 1D returns.
  2. Train-only horizon weights

    • For each horizon H ∈ {5, 21}:
      • Refit on the training span only.
      • Compute mean daily Spearman IC of horizon predictions vs realized returns.
    • Shrink horizon IC weights toward equal (HORIZON_IC_SHRINK_LAMBDA) to reduce variance.
    • Example: if 5D has stronger IC than 21D in training, it gets more weight.
  3. Blend across horizons

    • Convert both horizon predictions to cross-sectional z-scores per date (z5, z21).
    • Weighted combination:
      [ y_blend = w5 \cdot z5 + w{21} \cdot z21 ]
    • Output: PRED_BASE_HENS = horizon-ensemble prediction series.
  4. Final predictions

    • Set PRED_USE = PRED_BASE_HENS if horizon-ensemble is enabled.
    • Otherwise, fall back to the single-horizon (5D) lag-ensemble result.

Why this matters¶

  • Multiple horizons capture short-term (5D) and medium-term (21D) dynamics.
  • Train-only weighting ensures horizon blending is free of test-set leakage.
  • Z-scoring before blend puts different horizons on comparable scales.
In [16]:
# ----- 21D clone + Horizon ensemble (toggle) -----
if USE_HORIZON_ENSEMBLE:
    assert set(HORIZON_LIST).issubset(set(TARGET_HORIZONS)), "HORIZON_LIST must be in TARGET_HORIZONS"

    # 5D & 21D paths
    pred_by_lag_5,  pred_ens_5_raw,  PRED_BASE_5D  = _fitpredict_one_horizon(5,  MODEL_KIND, MODEL_PARAMS)
    pred_by_lag_21, pred_ens_21_raw, PRED_BASE_21D = _fitpredict_one_horizon(21, MODEL_KIND, MODEL_PARAMS)

    # --- Train-only horizon weights (IC on train; shrink toward equal) ---
    def _train_ic_for_horizon(H: int,
                              model_kind: str = MODEL_KIND,
                              model_params: dict | None = MODEL_PARAMS) -> float:
        """
        Train-time IC for a given horizon H using the SAME model variant as production.
        """
        tcol = f"fwd_ret_{H}d"
        dfH  = _make_df_asof_for_h(df_asof, H)

        # lag weights on TRAIN for this horizon, with same model variant
        Wlag = compute_train_ic_weights(
            df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
            neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
            model_kind=model_kind, model_params=model_params
        )

        # first train DataFrame from the first window
        parts = _split_wf(dfH, wf_windows_run[0])
        dfs   = [p for p in parts if isinstance(p, pd.DataFrame)]
        tr    = dfs[0] if dfs else pd.DataFrame()
        if tr.empty:
            return 0.0

        # template pipeline; clone per-lag
        pipe_tpl = _make_pipeline(model_kind, model_params)

        preds = []
        for L in LAG_ENS:
            dL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
            dL = dL[(dL.index >= tr.index.min()) & (dL.index <= tr.index.max())]
            dL = dL.dropna(subset=[tcol])
            if dL.empty:
                continue

            pp = clone(pipe_tpl)
            X_tr, y_tr = dL[feature_cols], dL[tcol]
            pp.fit(X_tr, y_tr)
            yhat = pp.predict(X_tr)

            out = pd.DataFrame({"ticker": dL["ticker"].values,
                                f"y_pred_L{L}": yhat}, index=dL.index)
            out.index.name = "date"
            preds.append(out)

        if not preds:
            return 0.0

        # weight per-lag train preds with Wlag
        P = reduce(lambda a,b: a.join(b, how='inner'),
                   [p.set_index(['ticker'], append=True).sort_index() for p in preds])
        P['y_pred'] = 0.0
        for L in LAG_ENS:
            col = f'y_pred_L{L}'
            if col in P.columns:
                P['y_pred'] += float(Wlag[L]) * P[col]
        P = (P[['y_pred']].reset_index().set_index('date')[['ticker','y_pred']].sort_index())

        # neutralize vs. 1D return and compute IC on the train span
        Pn = _neutralize_vs_ret1_quick(P, df)
        ic_series = _compute_rank_ic_quick(Pn, df[['ticker','close']], horizon=H, min_names=10, demean=True)
        return float(ic_series.mean()) if len(ic_series) else 0.0

    # compute horizon weights on train
    ic_train_by_H = {}
    for H in progress(HORIZON_LIST, total=len(HORIZON_LIST), desc="H-weights train IC"):
        ic_train_by_H[H] = max(_train_ic_for_horizon(H), 0.0)

    w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
    if w_raw.sum() == 0: 
        w_raw[:] = 1.0
    w_raw /= w_raw.sum()
    w_eq  = np.ones_like(w_raw) / len(HORIZON_LIST)
    wH = (1 - HORIZON_IC_SHRINK_LAMBDA) * w_raw + HORIZON_IC_SHRINK_LAMBDA * w_eq
    H_WEIGHTS = dict(zip(HORIZON_LIST, wH))

    H_W_SUMMARY = {
        'H_w5':  float(H_WEIGHTS.get(5,  np.nan)),
        'H_w21': float(H_WEIGHTS.get(21, np.nan)),
    }
    print("[h-ens] weights detail:", H_W_SUMMARY)
    print("[h-ens] Train-only horizon weights:", {int(k): round(v,4) for k,v in H_WEIGHTS.items()})

    # --- Per-date z-score + blend across horizons (outer join, fill mean=0) ---
    Z5  = _cs_zscore_by_date(PRED_BASE_5D).rename(columns={'y_pred':'z5'})
    Z21 = _cs_zscore_by_date(PRED_BASE_21D).rename(columns={'y_pred':'z21'})
    Z   = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
    Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)

    y_blend = H_WEIGHTS.get(5, 0)*Z['z5'] + H_WEIGHTS.get(21, 0)*Z['z21']
    PRED_BASE_HENS = (pd.DataFrame({'date': Z['date'], 'ticker': Z['ticker'], 'y_pred': y_blend})
                      .set_index('date')[['ticker','y_pred']].sort_index())

    PRED_USE = PRED_BASE_HENS.copy()

else:
    # keep existing 5D path result in PRED_USE 
    pass


7 - Risk-Control Overlays (Train/Test Predictions)¶

At this stage we apply risk controls to the test-set predictions PRED_USE.
The purpose is to align raw model outputs with realistic portfolio constraints.

Steps¶

  1. Deduplication & snapshot

    • Ensure only one (date, ticker) row remains per prediction (keep last if duplicates).
    • Save PRED_BASE = post–1D-return neutralized predictions, pre-beta-neutral overlay (useful for ablation tests).
  2. ADV eligibility filter (if enabled)

    • Build a per-date × ticker eligibility mask based on rolling average dollar volume (ADV).
    • Keeps only names above the specified liquidity percentile (ADV_PCT).
    • This prevents illiquid names from entering the portfolio.
  3. Beta-neutral overlay (if enabled)

    • Compute rolling betas of each ticker to the market.
    • Cross-sectionally regress scores on betas per date, and replace with residuals.
    • Strength toggle allows partial neutralization (e.g., 0.5 = half beta neutral, 1.0 = full).
  4. Final check

    • Deduplicate again after overlays.
    • Log the count of duplicate (date, ticker) rows as a debug sanity check.

Outputs¶

  • PRED_USE: final test-set predictions after risk controls.
  • PRED_BASE: snapshot before beta-neutralization, for comparison.
In [17]:
# --- Risk controls (toggled) ---

# 0) Ensure uniqueness before overlays & keep a pre-beta snapshot for ablations
def _dedup_scores(df_scores: pd.DataFrame) -> pd.DataFrame:
    return (df_scores.reset_index().rename(columns={'index':'date'})
            .sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .set_index('date')[['ticker','y_pred']])

PRED_USE  = _dedup_scores(PRED_USE)   # post 1D-return neutralization
PRED_BASE = PRED_USE.copy()           # <-- snapshot PRE-BETA for ablations

# 1) ADV eligibility
if 'ELIG_ADV' not in globals():
    ELIG_ADV = None
if USE_ADV_ELIGIBILITY:
    ELIG_ADV = build_adv_eligibility(df, lookback=ADV_LOOKBACK_DAYS, pct=ADV_PCT)
    print(f"[elig] ADV lookback={ADV_LOOKBACK_DAYS}d, pct={ADV_PCT:.2f} → shape {ELIG_ADV.shape}")

# 2) Beta-neutral overlay on scores
if USE_BETA_NEUTRAL:
    BETAS = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS)
    PRED_USE = neutralize_vs_beta(PRED_BASE, BETAS,
                                  min_names=MIN_XS_NAMES,
                                  strength=BETA_NEUTRAL_STRENGTH)
    PRED_USE = _dedup_scores(PRED_USE)
    print(f"[beta] applied beta-neutral overlay (lookback={BETA_LOOKBACK_DAYS}d, strength={BETA_NEUTRAL_STRENGTH})")

_dups = (PRED_USE.reset_index().rename(columns={'index':'date'})
         .duplicated(['date','ticker']).sum())
print(f"[debug] duplicate (date,ticker) rows in PRED_USE: {_dups}")
[elig] ADV lookback=60d, pct=0.20 → shape (2763, 60)
[debug] duplicate (date,ticker) rows in PRED_USE: 0


8 - Evaluation on Test Predictions (Backtest & Reporting)¶

We now evaluate the test-set predictions by running the execution-side backtest and reporting core portfolio stats.

Inputs

  • PRED_USE – final test predictions (after train→test WF pipeline, lag/horizon ensembles, and post-prediction overlays).
  • px – prices (close) for realized returns.
  • Risk controls: ADV eligibility, per-name caps, leverage, turnover costs.

Backtest logic

  • Rebalance per REBALANCE_FREQ, trade next day (as-of discipline), hold for HOLDING_PERIOD via rolling-mean overlap, apply transaction costs.
  • Output: daily portfolio return ret, turnover, and cumulative equity.

Reports

  • perf_stats: CAGR, annualized Vol, Sharpe, Max Drawdown, and sample size.
  • Diagnostics: average gross exposure (∑|w|), average number of names held, average daily turnover.
  • Plot: strategy equity curve (label reflects whether horizon-ensemble is used).
  • If ADV eligibility or per-name caps are toggled, summarize their effective constraints.
In [18]:
px = df[['ticker','close']].copy()

bt_main, w_held_main = backtest(
    PRED_USE, px,
    long_short=LONG_SHORT,
    top_q=TOP_QUANTILE,
    freq=REBALANCE_FREQ,
    holding_period=HOLDING_PERIOD,
    tcost_bps=TCOST_BPS,
    eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
    per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
    gross_leverage=GROSS_LEVERAGE,
    debug=False
)


bt_5d     = bt_main
w_held_5d = w_held_main

curve_title = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
print_label = 'Perf (h-ens)' if USE_HORIZON_ENSEMBLE else 'Perf (5D target)'

print(f"{print_label}:", perf_stats(bt_main['ret']))
print("Avg |w| per day:", float(w_held_main.abs().sum(axis=1).mean()))
print("Avg names held:", float((w_held_main.abs() > 1e-12).sum(axis=1).mean()))
print("Avg daily turnover:", float(bt_main['turnover'].mean()))
plot_equity_curve(bt_main['equity'], title=curve_title)

if USE_ADV_ELIGIBILITY and ELIG_ADV is not None:
    elig_frac = ELIG_ADV.mean(axis=1)
    print(f"[elig] mean eligible names/day: {float(ELIG_ADV.sum(axis=1).mean()):.1f}")
    print(f"[elig] mean eligible fraction: {elig_frac.mean():.2%}  | min: {elig_frac.min():.2%}")

if USE_PER_NAME_CAP and PER_NAME_CAP:
    print(f"[cap] per-name cap active at {PER_NAME_CAP:.2%} (long_short={LONG_SHORT})")
Perf (5D target): {'CAGR': 0.09731340486377915, 'Vol': 0.03370629225265959, 'Sharpe': 2.772443375609235, 'MaxDD': -0.018407797326819852, 'N': 849}
Avg |w| per day: 0.46857950530035364
Avg names held: 31.288574793875146
Avg daily turnover: 0.07831330977620733
[elig] mean eligible names/day: 45.5
[elig] mean eligible fraction: 75.83%  | min: 0.00%
[cap] per-name cap active at 2.00% (long_short=True)

In this section, we take the final test-set predictions and evaluate execution-side overlays to see how each affects performance.

What this does¶

  • run_variant(): evaluates a labeled variant by optionally applying:
    • Beta neutralization on scores (test-time only), with beta_strength ∈ [0,1].
    • ADV eligibility mask (test-time only) to skip illiquid names.
    • Per-name cap passthrough to the backtest.
  • Runs the standard backtest on the modified test predictions and returns CAGR, Vol, Sharpe, MaxDD, N.

Variants compared¶

  • Base (no overlays) — raw test predictions after 1D-return neutralization.
  • ADV only — apply liquidity mask.
  • Beta 1.0 only / Beta 0.5 only — residualize scores vs rolling betas with full/half strength.
  • ADV + Beta — combine both overlays (using global BETA_NEUTRAL_STRENGTH).

Why this matters¶

These ablations isolate the execution-side controls and quantify their impact on realized performance without contaminating train/test splits.

In [19]:
def run_variant(label,
                pred_base: pd.DataFrame,
                adv: pd.DataFrame | None = None,
                betas: pd.DataFrame | None = None,
                beta_strength: float = 1.0,
                per_name_cap: float | None = None):
    """
    Evaluate a variant: optional ADV eligibility and optional beta neutralization (with strength).
    pred_base must be (index=date, cols=['ticker','y_pred']) AFTER 1D-return neutralization.
    """
    P = pred_base
    if betas is not None and beta_strength > 0:
        P = neutralize_vs_beta(P, betas, min_names=MIN_XS_NAMES, strength=beta_strength)
        P = (P.reset_index().sort_values(['date','ticker'])
               .drop_duplicates(['date','ticker'], keep='last')
               .set_index('date')[['ticker','y_pred']])

    bt, _ = backtest(P, df[['ticker','close']],
                     long_short=LONG_SHORT, top_q=TOP_QUANTILE,
                     freq=REBALANCE_FREQ, holding_period=HOLDING_PERIOD,
                     tcost_bps=TCOST_BPS, eligibility=adv,
                     per_name_cap=per_name_cap,  # pass cap through
                     debug=False)
    
    s = perf_stats(bt['ret'])
    s.update(dict(Label=label))
    return s

# small ablation table around PRED_BASE (pre-beta) and BETAS/ELIG_ADV
_base = PRED_BASE
_cap  = (PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None)
_adv  = (ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None)

rows = []
rows.append(run_variant("Base (no overlays)", _base, adv=None, betas=None, per_name_cap=_cap))
rows.append(run_variant("ADV only",          _base, adv=_adv, betas=None, per_name_cap=_cap))
if 'BETAS' in globals():
    rows.append(run_variant("Beta 1.0 only", _base, adv=None, betas=BETAS, beta_strength=1.0, per_name_cap=_cap))
    rows.append(run_variant("Beta 0.5 only", _base, adv=None, betas=BETAS, beta_strength=0.5, per_name_cap=_cap))
    rows.append(run_variant("ADV + Beta",    _base, adv=_adv, betas=BETAS, beta_strength=BETA_NEUTRAL_STRENGTH, per_name_cap=_cap))

abl = (pd.DataFrame(rows)
         .set_index('Label')[['CAGR','Vol','Sharpe','MaxDD','N']]
         .sort_values('Sharpe', ascending=False))
display(abl.round(3))
CAGR Vol Sharpe MaxDD N
Label
Base (no overlays) 0.0970 0.0340 2.7720 -0.0180 849
ADV only 0.0970 0.0340 2.7720 -0.0180 849


9 - Rank-IC Diagnostics (Test-Set Predictions Only)¶

We assess the cross-sectional predictive power of our final test-set predictions PRED_USE using Rank-IC (daily Spearman correlation between scores and future returns).

What’s computed¶

  • compute_rank_ic()
    Tidy-merges predictions with realized returns and computes daily Spearman IC, with optional demeaning per date to remove market drift.

    • horizon > 0: forward returns over h days, shifted to avoid leakage.
    • horizon < 0: backward check (sanity/look-ahead probe).
    • min_names: minimum cross-section size per day; else IC is NaN.
  • True IC on primary horizon (H = TARGET_HORIZONS[0])
    Reports mean IC, IC std, t-stat ((\bar{IC}/\sigma(IC))\sqrt{N}), and number of days.

  • Rolling IC
    60-day rolling mean IC plot to visualize temporal stability.

  • Cross-sectional dispersion sanity check
    Warns if daily score variance is zero (Spearman undefined).

  • +1D lag probe
    Shifts predictions by one day before evaluation to ensure the signal is not relying on same-day artifacts.

  • Shuffle tests (null baselines)
    (a) Cross-section shuffle per date — randomizes ranks within each day (IC should ≈ 0).
    (b) Calendar shuffle — permutes the return dates while keeping predictions fixed (IC should ≈ 0).

  • Lead/Lag IC profile
    IC across horizons −15…+15 days:

    • Negative lags: guard against look-ahead.
    • Positive lags: profile of true predictive horizon; peak location indicates timing.

Why this matters¶

These diagnostics quantify out-of-sample ranking skill, check for leakage or timing artifacts, and reveal where the signal’s horizon is strongest.

In [20]:
# Rank-IC diagnostics

px_tidy = df[['ticker','close']].copy()
if getattr(px_tidy.index, "tz", None) is not None:
    px_tidy.index = px_tidy.index.tz_localize(None)
px_tidy.index = pd.to_datetime(px_tidy.index).normalize()


def compute_rank_ic(
    pred_df: pd.DataFrame,
    df_prices: pd.DataFrame,
    horizon: int = 5,
    min_names: int = 10,
    demean: bool = True
) -> pd.Series:
    # predictions → tidy
    p = (pred_df[['ticker', 'y_pred']]
         .rename(columns={'y_pred': 'score'})
         .rename_axis('date')
         .reset_index())
    p['date'] = pd.to_datetime(p['date']).dt.normalize()
    p = p[['date','ticker','score']]

    # prices → tidy + normalized index
    px = df_prices[['ticker','close']].copy()
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()

    # realized returns per ticker
    if horizon > 0:
        fwd = px.groupby('ticker')['close'].pct_change(horizon).shift(-horizon)
    elif horizon < 0:
        H = abs(horizon)
        fwd = px.groupby('ticker')['close'].pct_change(H)
    else:
        fwd = pd.Series(0.0, index=px.index)

    real = (px.assign(fwd=fwd)
              .assign(date=lambda x: x.index)
              .reset_index(drop=True)[['date','ticker','fwd']])

    # merge preds with realized forward returns
    ic_df = p.merge(real, on=['date','ticker'], how='left')

    if demean:
        ic_df['fwd']   = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
        ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())

    def _daily_ic(g):
        g = g.dropna(subset=['score','fwd'])
        if len(g) < min_names:
            return np.nan
        return g[['score','fwd']].corr(method='spearman').iloc[0,1]

    ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
    ic.index.name = 'date'
    ic.name = f'IC_{horizon}d'
    return ic


# --- True IC on the primary horizon ---
H = TARGET_HORIZONS[0]
ic_true = compute_rank_ic(PRED_USE, px_tidy, horizon=H, min_names=10, demean=True)

N = len(ic_true)
ic_mean = float(ic_true.mean()) if N else np.nan
ic_std  = float(ic_true.std())  if N else np.nan
t_stat  = (ic_mean / ic_std * np.sqrt(N)) if (N and ic_std > 0) else np.nan

print(f"IC mean: {ic_mean:.4f} | IC std: {ic_std:.4f} | t-stat: {t_stat:.2f} | N days: {N}")
plot_rolling_ic(ic_true, window=60, title=f"60D Rolling Rank IC ({H}D horizon)")

# Sanity: CS dispersion
cs_std = PRED_USE.groupby(level=0)['y_pred'].std()
if (cs_std.fillna(0) == 0).all():
    print("All days have zero cross-sectional std; Spearman IC will be NaN.")

# +1D lag probe
pred_lag1 = (PRED_USE.copy()
             .groupby('ticker', group_keys=False)
             .apply(lambda g: g.assign(y_pred=g['y_pred'].shift(1))))
ic_lag1 = compute_rank_ic(pred_lag1, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (+1D lag) mean:", float(ic_lag1.mean()) if len(ic_lag1) else np.nan)

# Shuffle tests (seeded)
rng = np.random.default_rng(42)

# (a) cross-section shuffle per date
pred_shuf = (PRED_USE.rename(columns={'y_pred':'score'})
             .rename_axis('date').reset_index()
             .groupby('date', group_keys=False)
             .apply(lambda g: g.assign(score=rng.permutation(g['score'].values)))
             .set_index('date')
             .rename(columns={'score':'y_pred'}))
ic_shuf = compute_rank_ic(pred_shuf, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (shuffle xs) mean:", float(ic_shuf.mean()) if len(ic_shuf) else np.nan)

# (b) calendar shuffle: permute dates of returns
u_dates = px_tidy.index.unique()
perm_map = dict(zip(u_dates, rng.permutation(u_dates)))
px_perm = px_tidy.copy()
px_perm.index = px_perm.index.map(perm_map)
ic_cal = compute_rank_ic(PRED_USE, px_perm, horizon=H, min_names=10, demean=True)
print("IC (calendar shuffle) mean:", float(ic_cal.mean()) if len(ic_cal) else np.nan)



_ = lead_lag_ic_profile_plotly(
        PRED_USE, px_tidy, horizons=range(-15, 16),
        compute_rank_ic_fn=compute_rank_ic
)
IC mean: 0.0934 | IC std: 0.2053 | t-stat: 13.18 | N days: 840
IC (+1D lag) mean: 0.0916633364153255
IC (shuffle xs) mean: 0.0038697698595677203
IC (calendar shuffle) mean: 0.006396083685574156
Lead/Lag IC:   0%|                                                                              | 0/31 [00:00<…
In [21]:
# --- Annual breakdown (perf + IC) ---

# DataFrame of perf stats by calendar year
rows = []
for y, s in bt_5d['ret'].groupby(bt_5d.index.year):
    d = perf_stats(s)         # -> dict
    d['year'] = int(y)
    rows.append(d)
yr_perf = pd.DataFrame(rows).set_index('year').sort_index()

# Mean daily IC per year 
if 'ic_true' in globals() and isinstance(ic_true, pd.Series) and len(ic_true):
    yr_ic = ic_true.groupby(ic_true.index.year).mean()
    yr_ic.name = 'IC_mean'
    yr_ic = yr_ic.to_frame()
    yr_ic.index.name = 'year'
else:
    yr_ic = pd.DataFrame(columns=['IC_mean'])

annual = yr_perf.join(yr_ic, how='left')
display(annual.round(3))
CAGR Vol Sharpe MaxDD N IC_mean
year
2021 0.0130 0.0310 0.4300 -0.0140 101 0.0420
2022 0.0830 0.0340 2.3820 -0.0170 251 0.0850
2023 0.1200 0.0370 3.0470 -0.0180 250 0.1040
2024 0.1260 0.0310 3.8810 -0.0160 247 0.1140

Lag-Ensemble Diagnostics (Test-Only)¶

We evaluate each AS-OF lag separately and compare them to the final ensemble on the test folds.

What this does¶

1) Per-lag IC means

  • For each lag L, optionally residualize scores vs prior 1D return (neutralize_single=True), then compute daily Spearman Rank-IC on the test set at horizon H.
  • Plot a bar chart of per-lag mean ICs.

2) Per-lag backtests vs ensemble

  • Backtest each single-lag signal and the already-neutralized ensemble under the same execution settings (rebalance freq, top-q selection, costs, eligibility, caps).
  • Overlay equity curves (lags vs ensemble) and print a performance table (CAGR, Vol, Sharpe, MaxDD) with the per-lag IC for context.

Inputs / outputs¶

  • Inputs:
    pred_by_lag = {L: (date,[ticker,y_pred_L{L}])}, pred_ensemble = (date,[ticker,y_pred]), prices df_prices, horizon H.
  • Outputs:
    s (Series of per-lag mean IC), perf_df (per-lag perf metrics), eq_df (equity curves per lag and ensemble).

Note: This is a test-only analysis — no retraining occurs here. It helps attribute where the ensemble’s edge comes from and whether any single lag dominates.

In [22]:
# --- Lag-ensemble diagnostics: per-lag ICs + equity curves -------------------

def _px_tidy(df_prices):
    px = df_prices[['ticker','close']].copy()
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()
    return px

def _neutralize_vs_ret1(pred_df, df_prices):
    """Per-date residualize y_pred against prior 1D close-to-close return."""
    ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
    ret1 = (df_prices.assign(ret_1d=ret1)
            .assign(date=lambda x: x.index)
            .reset_index(drop=True)[['date','ticker','ret_1d']])
    sc = pred_df.reset_index().rename(columns={'index':'date'})
    m  = sc.merge(ret1, on=['date','ticker'], how='left')

    def _resid_per_date(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        mask = np.isfinite(x) & np.isfinite(y)
        if mask.sum() < 10:
            g['y_pred'] = np.nan
            return g
        beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
        g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
        return g

    out = (m.groupby('date', group_keys=False).apply(_resid_per_date)
             .dropna(subset=['y_pred'])
             .set_index('date')[['ticker','y_pred']])
    return out

def lag_ens_diagnostics(pred_by_lag: dict,
                        pred_ensemble: pd.DataFrame,
                        df_prices: pd.DataFrame,
                        horizon: int,
                        neutralize_single: bool = True,
                        title_suffix: str = ""):
    """
    pred_by_lag: {L: DataFrame(index=date, cols=['ticker', f'y_pred_L{L}'])}
    pred_ensemble: DataFrame(index=date, cols=['ticker','y_pred']) (already neutralized)
    df_prices: wide tidy prices (the 'df' you use everywhere)
    """
    TOP_Q   = globals().get('TOP_QUANTILE', 0.2)
    FREQ    = globals().get('REBALANCE_FREQ', 'W-FRI')
    HP      = globals().get('HOLDING_PERIOD', 5)
    TCOST   = globals().get('TCOST_BPS', 0)

    px = _px_tidy(df_prices)
    lags = sorted(pred_by_lag.keys())

    # --- 1) per-lag IC means
    ic_means = {}
    preds_use = {}
    for L in progress(lags, total=len(lags), desc="Per-lag IC"):
        pL = pred_by_lag[L].copy()
        col = f'y_pred_L{L}'
        if col not in pL.columns:
            col = 'y_pred'
        pL = pL.rename(columns={col: 'y_pred'})
        if neutralize_single:
            pL = _neutralize_vs_ret1(pL, df_prices)
        preds_use[L] = pL
        ic = compute_rank_ic(pL, px, horizon=horizon, min_names=10, demean=True)
        ic_means[L] = float(ic.mean()) if len(ic) else np.nan

    # --- Plot per-lag ICs
    s = bar_ic_means(ic_means, title=f'Per-lag mean IC (H={horizon}d){(" " + title_suffix) if title_suffix else ""}')

    # --- 2) Backtest each lag and the ensemble; plot equity overlay
    eq = {}
    rows = []
    # individual lags
    for L in progress(lags, total=len(lags), desc="Per-lag backtests"):
        bt, _ = backtest(
        preds_use[L], df_prices[['ticker','close']],
        long_short=False, top_q=TOP_Q, freq=FREQ,
        holding_period=HP, tcost_bps=TCOST,
        eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
        per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
        debug=False
        )
        eq[f"L{L}"] = bt['equity'].rename(f"L{L}")
        stats = perf_stats(bt['ret'])
        stats.update(dict(model=f"L{L}", IC_mean=s.loc[L]))
        rows.append(stats)

    # ensemble (already neutralized upstream)
    btE, _ = backtest(
    pred_ensemble, df_prices[['ticker','close']],
    long_short=False, top_q=TOP_Q, freq=FREQ,
    holding_period=HP, tcost_bps=TCOST,
    eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
    per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
    debug=False
    )
    eq["Ens"] = btE['equity'].rename("Ens")

    eq_df = pd.concat(eq, axis=1).dropna(how='all')
    multi_equity_plot(eq, title=f"Equity curves: individual lags vs ensemble{(' ' + title_suffix) if title_suffix else ''}")


    perf_df = pd.DataFrame(rows).set_index('model').sort_values('Sharpe', ascending=False)
    display(perf_df.round(3))

    return s, perf_df, eq_df
In [23]:
ic_by_lag, perf_by_lag, eq_df = lag_ens_diagnostics(
    pred_by_lag,
    PRED_USE,
    df,
    horizon=TARGET_HORIZONS[0],
    neutralize_single=True,
    title_suffix="(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
)
Per-lag IC:   0%|                                                                                | 0/2 [00:00<…
Per-lag backtests:   0%|                                                                         | 0/2 [00:00<…
CAGR Vol Sharpe MaxDD N IC_mean
model
L1 0.0700 0.0480 1.4240 -0.0510 849 0.0910
L3 0.0640 0.0490 1.2940 -0.0540 849 0.0870

Diagnostic: Residual Beta Exposure (Test Predictions)¶

We quantify any remaining market beta exposure in the test-set predictions after overlays.

Method

  • For each date, regress scores y_pred cross-sectionally on rolling betas: [ \text{beta_slope}_t \;=\; \frac{\sumi (x{it}-\bar xt)(y{it}-\bar y_t)}{\sumi (x{it}-\bar xt)^2} ] where (x{it}) is the rolling beta of ticker (i) and (y_{it}) is the prediction score.
  • Requires at least MIN_XS_NAMES names per day; otherwise returns NaN.

Interpretation

  • Mean of beta_slope ≈ 0 ⇒ scores are neutral to market beta (as intended).
  • Positive (negative) mean ⇒ residual long (short) tilt to high-beta names.
  • You can also plot a rolling mean to check stability over time.

Outputs

  • beta_slope: daily series of estimated beta loading of the scores.
  • Printed mean value for a quick sanity check.
In [24]:
def beta_exposure_series(pred_df, betas):
    s = (pred_df.reset_index()
         .pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last'))
    b = betas.reindex(s.index).reindex(columns=s.columns)
    expo = []
    for dt in s.index:
        y = s.loc[dt].values; x = b.loc[dt].values
        m = np.isfinite(y) & np.isfinite(x)
        if m.sum() < MIN_XS_NAMES: expo.append(np.nan); continue
        y0 = y[m] - y[m].mean(); x0 = x[m] - x[m].mean()
        expo.append(float(np.dot(x0, y0) / (np.dot(x0, x0) + 1e-12)))
    return pd.Series(expo, index=s.index, name='beta_slope')

if USE_BETA_NEUTRAL:
    beta_slope = beta_exposure_series(PRED_USE, BETAS)
    print("Beta exposure mean (should be ~0):", float(beta_slope.mean()))


10 - Model Variant Sweep (Train → Test, Parallelized)¶

This section trains on walk-forward training folds and evaluates on their test folds for a set of model variants (e.g., Ridge, ElasticNet, HGBT). For each variant we build horizon-ensemble predictions (5D + 21D), optionally apply beta overlay, and backtest — then compare performance and equity curves.

Workflow (per variant)¶

1) Train/Test predictions (WF)

  • _fitpredict_one_horizon(5, kind, params) and _fitpredict_one_horizon(21, kind, params):
    • Train model on each fold’s training slice.
    • Test on the subsequent test month.
    • Internally includes lag-ensemble and 1D-return neutralization.

2) Train-only horizon weights (cached)

  • _train_ic_for_horizon(...) computes mean daily Spearman IC on the training span only for each horizon.
  • Weights are shrunk toward equal by HORIZON_IC_SHRINK_LAMBDA.
  • Results cached by (H, model kind, params) to avoid recomputation.

3) Blend horizons on test predictions

  • Per-date z-score each horizon’s scores, then combine with the train-only weights to get the horizon-ensemble test scores.

4) Optional beta overlay (test-time)

  • If USE_BETA_NEUTRAL=True, residualize scores vs precomputed rolling betas (BETAS_FOR_SWEEP), then deduplicate to one (date, ticker) score.

5) Backtest & metrics (test-only)

  • Run backtest(...) with the same execution settings (rebalance, top-q, costs, eligibility, per-name cap, leverage).
  • Collect CAGR, Vol, Sharpe, MaxDD, N_days, AvgTurnover, wall_sec.

Parallelization & safety¶

  • Up to N_JOBS_VARIANTS variants run in parallel (threading backend); per-estimator threads limited via threadpool_limits to prevent oversubscription.
  • Errors are caught per variant and reported in the results table without stopping the sweep.

Outputs¶

  • variant_tbl: sortable comparison of metrics per variant (with params and runtime).
  • Equity overlay plot across variants for quick visual comparison.

Note: All weighting (IC for horizons) uses training data only. Backtests and reported metrics are strictly out-of-sample test results.

In [25]:
# --- 10. Model variant sweep -------------------
from functools import lru_cache
from joblib import Parallel, delayed
from threadpoolctl import threadpool_limits
import time, json, os


# How many variants to run concurrently (1 to keep serial + tqdm)
N_JOBS_VARIANTS = max(1, min(os.cpu_count() - 1, 4))
# Prevent thread oversubscription inside each worker
LIMIT_THREADS_PER_EST = 1

if USE_VARIANT_SWEEP:
    VARIANTS = [
        ("ridge",      {"alpha": 1.0}),
        ("elasticnet", {"alpha": 0.5, "l1_ratio": 0.2}),
        ("elasticnet", {"alpha": 1.0, "l1_ratio": 0.1}),
        ("hgbt",       {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250}),
    ]
else:
    VARIANTS = [(PROD_MODEL_KIND, PROD_MODEL_PARAMS)]

# precompute betas once for the sweep if overlay is ON
BETAS_FOR_SWEEP = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS) if USE_BETA_NEUTRAL else None

def _variant_label(kind: str, params: dict) -> str:
    keys = ("alpha","l1_ratio","epsilon","max_depth","learning_rate","max_iter")
    short = ",".join(f"{k}={params[k]}" for k in keys if k in params)
    return f"{kind}({short})" if short else kind


# Memorize the train-IC by (H, model kind, params)
@lru_cache(maxsize=None)
def _train_ic_for_horizon_cached(H: int, kind: str, params_key: str) -> float:
    params = json.loads(params_key)
    if not USE_HORIZON_ENSEMBLE or '_train_ic_for_horizon' not in globals():
        return 0.0
    val = _train_ic_for_horizon(H, model_kind=kind, model_params=params)
    return float(max(val, 0.0))

def run_variant(kind, params):
    t0 = time.perf_counter()
    label = _variant_label(kind, params)
    params_key = json.dumps(params, sort_keys=True)
    try:
        with threadpool_limits(limits=LIMIT_THREADS_PER_EST):
            # horizon-ensemble predictions with the given model
            _, _, P5  = _fitpredict_one_horizon(5,  kind, params)
            P21 = None
            if USE_HORIZON_ENSEMBLE:
                _, _, P21 = _fitpredict_one_horizon(21, kind, params)

        if USE_HORIZON_ENSEMBLE and P21 is not None:
            # horizon weights on train (uses cached IC)
            ic_train_by_H = {H: _train_ic_for_horizon_cached(H, kind, params_key) for H in HORIZON_LIST}
            w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
            if w_raw.sum()==0: 
                w_raw[:] = 1.0
            w_raw /= w_raw.sum()
            w_eq  = np.ones_like(w_raw)/len(HORIZON_LIST)
            wH    = (1 - HORIZON_IC_SHRINK_LAMBDA)*w_raw + HORIZON_IC_SHRINK_LAMBDA*w_eq
            H_WEIGHTS = dict(zip(HORIZON_LIST, wH))

            # per-date z-score + blend across horizons
            Z5  = _cs_zscore_by_date(P5).rename(columns={'y_pred':'z5'})
            Z21 = _cs_zscore_by_date(P21).rename(columns={'y_pred':'z21'})
            Z   = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
            Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)
            y_blend = H_WEIGHTS.get(5,0)*Z['z5'] + H_WEIGHTS.get(21,0)*Z['z21']
            PRED = (pd.DataFrame({'date':Z['date'],'ticker':Z['ticker'],'y_pred':y_blend})
                    .set_index('date')[['ticker','y_pred']].sort_index())
        else:
             # 5D only path (PROFILE P1) — P5 is already neutralized vs 1D inside _fitpredict_one_horizon
            PRED = P5.copy()
        # de-dup safety
        PRED = (PRED.reset_index().sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .set_index('date')[['ticker','y_pred']])

        # beta overlay (reuses precomputed betas)
        if USE_BETA_NEUTRAL and BETAS_FOR_SWEEP is not None:
            PRED = neutralize_vs_beta(PRED, BETAS_FOR_SWEEP,
                                      min_names=MIN_XS_NAMES,
                                      strength=BETA_NEUTRAL_STRENGTH)
            PRED = (PRED.reset_index().sort_values(['date','ticker'])
                    .drop_duplicates(['date','ticker'], keep='last')
                    .set_index('date')[['ticker','y_pred']])

        # backtest on current risk toggles
        bt, _ = backtest(
            PRED, df[['ticker','close']],
            long_short=LONG_SHORT,
            top_q=TOP_QUANTILE,
            freq=REBALANCE_FREQ,
            holding_period=HOLDING_PERIOD,
            tcost_bps=TCOST_BPS,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,
            debug=False
        )
        stats = perf_stats(bt['ret'])
        stats.update(
                label=label,
                kind=kind,
                params=params_key,
                wall_sec=time.perf_counter() - t0,
                N_days=int(bt['ret'].shape[0]),
                AvgTurnover=float(bt['turnover'].mean()),
        )
        return stats, bt['equity'].rename(label)  
    
    except Exception as e:
        import traceback
        tb = traceback.format_exc()
        stats = dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0,
                     label=label, kind=kind, params=params_key, wall_sec=np.nan,
                     error=str(e))
        print(f"[variant ERROR] {label}\n{tb}")
        empty_eq = pd.Series(dtype=float, name=label)
        return stats, empty_eq

# --- Execute ----------
rows, curves = [], {}

if N_JOBS_VARIANTS > 1 and len(VARIANTS) > 1:
    print(f"[variants] running {len(VARIANTS)} models in parallel "
          f"(n_jobs={N_JOBS_VARIANTS}, per-task threads={LIMIT_THREADS_PER_EST})")

    tasks = (delayed(run_variant)(kind, params) for kind, params in VARIANTS)
    with joblib_progress(len(VARIANTS), "Model variants"):
        results = Parallel(n_jobs=N_JOBS_VARIANTS, backend="threading")(tasks)

    for s, eq in results:
        rows.append(s); curves[eq.name] = eq
else:
    # Ridge-only (or single variant) path
    for kind, params in VARIANTS:
        s, eq = run_variant(kind, params)
        rows.append(s); curves[eq.name] = eq

variant_tbl = (pd.DataFrame(rows)
               .set_index('label')
               .sort_values('Sharpe', ascending=False)
)
base_cols = ['kind','params','CAGR','Vol','Sharpe','MaxDD','N']
extra_cols = [c for c in ['N_days','AvgTurnover','wall_sec','error'] if c in variant_tbl.columns]
variant_tbl = variant_tbl.reindex(columns=base_cols + extra_cols)
display(variant_tbl.round(3))

title = "Equity by model kind (h-ens)" if len(curves) > 1 else f"Equity: {next(iter(curves))}"
multi_equity_plot(curves, title=title)
[variant ERROR] ridge
Traceback (most recent call last):
  File "C:\Users\nadiy\AppData\Local\Temp\ipykernel_11216\1929571131.py", line 46, in run_variant
    with threadpool_limits(limits=LIMIT_THREADS_PER_EST):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 171, in __init__
    self._original_info = self._set_threadpool_limits()
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 268, in _set_threadpool_limits
    modules = _ThreadpoolInfo(prefixes=self._prefixes,
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 340, in __init__
    self._load_modules()
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 373, in _load_modules
    self._find_modules_with_enum_process_module_ex()
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 485, in _find_modules_with_enum_process_module_ex
    self._make_module_from_path(filepath)
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 515, in _make_module_from_path
    module = module_class(filepath, prefix, user_api, internal_api)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 606, in __init__
    self.version = self.get_version()
                   ^^^^^^^^^^^^^^^^^^
  File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 646, in get_version
    config = get_config().split()
             ^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'split'

kind params CAGR Vol Sharpe MaxDD N wall_sec error
label
ridge ridge {} NaN NaN NaN NaN 0 NaN 'NoneType' object has no attribute 'split'

11 - Parameter sweeps¶

In [26]:
# 11.Holding Period (HP), Rebalance Freq, Top-Q
import itertools

HP_LIST    = [5, 10]
FREQ_LIST  = ['W-FRI', 'M']
TOPQ_LIST  = [0.10, 0.20, 0.30]
TCOST_FOR_SWEEP = TCOST_BPS   # costs fixed here; cost sweep below

def run_sweep_hpfreq_topq(pred_df, prices_df,
                          hp_list=HP_LIST, freq_list=FREQ_LIST, topq_list=TOPQ_LIST,
                          tcost_bps=TCOST_FOR_SWEEP, long_short=LONG_SHORT):
    rows, eq = [], {}

    grid = list(itertools.product(hp_list, freq_list, topq_list))
    for hp, fq, tq in progress(grid, total=len(grid),
                               desc="Sweep HP/Freq/TopQ"):
        bt, wheld = backtest(
            pred_df, prices_df[['ticker','close']],
            long_short=long_short,
            top_q=tq,
            freq=fq,
            holding_period=hp,
            tcost_bps=tcost_bps,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,     
            debug=False
        )
        stats = perf_stats(bt['ret'])
        label = f"HP{hp}-{fq}-Q{int(tq*100)}"

        stats.update(dict(
            label=label, HP=hp, Freq=fq, TopQ=tq,
            AvgTurnover=float(bt['turnover'].mean()),
            AvgNames=float((wheld.abs() > 1e-12).sum(axis=1).mean())
        ))

        eq[label] = bt['equity'].rename(label)
        rows.append(stats)

    res = pd.DataFrame(rows).set_index('label').sort_values('Sharpe', ascending=False)
    eq_df = pd.concat(eq, axis=1).dropna(how='all')
    return res, eq_df


sweep_tbl, sweep_eq = run_sweep_hpfreq_topq(PRED_USE, df)
display(sweep_tbl.round(3))

# Sharpe vs Turnover scatter 
import plotly.graph_objects as go
def scatter_sharpe_vs_turnover(sweep_df, title="Sharpe vs Avg Daily Turnover"):
    s = sweep_df.copy()
    s['TopQ%'] = (s['TopQ']*100).astype(int)
    fig = go.Figure()
    for fq in s['Freq'].unique():
        ss = s[s['Freq']==fq]
        fig.add_trace(go.Scatter(
            x=ss['AvgTurnover'], y=ss['Sharpe'],
            mode='markers+text', name=fq,
            text=ss.index, textposition='top center',
            marker=dict(size=10),
            hovertemplate=("Label=%{text}<br>Sharpe=%{y:.2f}"
                           "<br>Turnover=%{x:.3f}<br>HP=%{customdata[0]} "
                           "<br>TopQ=%{customdata[1]}%"),
            customdata=np.c_[ss['HP'], ss['TopQ%']]
        ))
    fig.update_layout(title=title, xaxis_title="Avg daily turnover (|Δw|1)",
                      yaxis_title="Sharpe", width=900, height=380)
    fig.show()

scatter_sharpe_vs_turnover(sweep_tbl)

# Overlay equity curves for the top 3 combos by Sharpe
top_labels = sweep_tbl.head(3).index.tolist()
multi_equity_plot(
    {k: sweep_eq[k] for k in top_labels},
    title=f"Equity: Top 3 sweep configs {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}"
)
Sweep HP/Freq/TopQ:   0%|                                                                       | 0/12 [00:00<…
CAGR Vol Sharpe MaxDD N HP Freq TopQ AvgTurnover AvgNames
label
HP10-M-Q20 0.1340 0.0320 3.9920 -0.0250 832 10 M 0.2000 0.0210 23.8040
HP10-M-Q30 0.1650 0.0390 3.9460 -0.0380 832 10 M 0.3000 0.0290 34.2160
HP5-M-Q20 0.1340 0.0330 3.8460 -0.0240 832 5 M 0.2000 0.0210 21.5360
HP5-M-Q30 0.1580 0.0400 3.6800 -0.0390 832 5 M 0.3000 0.0290 31.7600
HP10-W-FRI-Q20 0.1150 0.0330 3.3470 -0.0170 849 10 W-FRI 0.2000 0.0460 37.5680
HP10-W-FRI-Q30 0.1340 0.0390 3.2820 -0.0250 849 10 W-FRI 0.3000 0.0580 49.4710
HP10-M-Q10 0.0720 0.0230 3.0260 -0.0180 832 10 M 0.1000 0.0130 12.7810
HP5-M-Q10 0.0720 0.0240 2.8760 -0.0160 832 5 M 0.1000 0.0130 11.3700
HP10-W-FRI-Q10 0.0650 0.0220 2.8550 -0.0120 849 10 W-FRI 0.1000 0.0270 20.9630
HP5-W-FRI-Q20 0.0970 0.0340 2.7720 -0.0180 849 5 W-FRI 0.2000 0.0780 31.2890
HP5-W-FRI-Q30 0.1070 0.0400 2.5480 -0.0270 849 5 W-FRI 0.3000 0.1010 43.7360
HP5-W-FRI-Q10 0.0560 0.0230 2.3650 -0.0130 849 5 W-FRI 0.1000 0.0460 16.5370
In [27]:
# 11.1 Cost sweep (keep HP/Freq/TopQ at chosen baseline from §8)

COSTS = [0, 5, 10, 25]

def cost_sweep(pred_df, prices_df,
               hp=HOLDING_PERIOD, fq=REBALANCE_FREQ, tq=TOP_QUANTILE, long_short=LONG_SHORT):
    rows, eq = [], {}
    for bps in progress(COSTS, total=len(COSTS), desc="Cost sweep"):
        bt, _ = backtest(
            pred_df, prices_df[['ticker','close']],
            long_short=long_short,
            top_q=tq,
            freq=fq,
            holding_period=hp,
            tcost_bps=bps,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,   
            debug=False
        )
        s = perf_stats(bt['ret']); s.update(dict(tcost_bps=bps))
        rows.append(s)
        eq[f"{bps}bps"] = bt['equity'].rename(f"{bps}bps")
    return pd.DataFrame(rows).set_index('tcost_bps').sort_index(), eq


cost_tbl, cost_eq = cost_sweep(PRED_USE, df,
                               hp=HOLDING_PERIOD,
                               fq=REBALANCE_FREQ,
                               tq=TOP_QUANTILE,
                               long_short=LONG_SHORT)
display(cost_tbl.round(3))

# Plot cost equity overlays
multi_equity_plot(cost_eq, title=f"Equity by transaction costs (bps) {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}")


import plotly.graph_objects as go
fig = go.Figure([go.Bar(x=[str(i) for i in cost_tbl.index], y=cost_tbl['Sharpe'].values)])
mode = "(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
fig.update_layout(title=f"Sharpe by tcost (bps) {mode}",
                  xaxis_title="bps", yaxis_title="Sharpe",
                  width=650, height=320)
fig.show()
Cost sweep:   0%|                                                                                | 0/4 [00:00<…
CAGR Vol Sharpe MaxDD N
tcost_bps
0 0.1410 0.0340 3.9470 -0.0150 849
5 0.1300 0.0340 3.6540 -0.0160 849
10 0.1190 0.0340 3.3600 -0.0170 849
25 0.0870 0.0340 2.4790 -0.0200 849

12 - Save artifacts & summary¶

In [28]:
from datetime import datetime

# --- Checks  ---
assert 'PRED_USE' in locals(), "PRED_USE (neutralized ensemble) not defined yet."
assert 'bt_5d'     in locals(), "bt_5d not defined yet."
assert 'w_held_5d' in locals(), "w_held_5d not defined yet."

RUNS_DIR = Path("results/equity_daily")
RUNS_DIR.mkdir(parents=True, exist_ok=True)

run_id  = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = RUNS_DIR / run_id
out_dir.mkdir(parents=True, exist_ok=True)

# --- Save core artifacts ---
PRED_USE.to_parquet(out_dir / "pred_5d.parquet")  
bt_5d.to_parquet(out_dir / "bt_5d.parquet")
w_held_5d.to_parquet(out_dir / "weights_held.parquet")



# --- Save config / metadata for full reproducibility ---
metadata = {
    "run_id": run_id,
    "start_date": str(START_DATE),
    "end_date": str(END_DATE),
    "target_horizons": TARGET_HORIZONS,
    "max_h": MAX_H,
    "train_years": TRAIN_YEARS,
    "step_months": STEP_MONTHS,
    "embargo_days": EMBARGO_DAYS,
    "rebalance_freq": REBALANCE_FREQ,
    "long_short": LONG_SHORT,
    "top_quantile": TOP_QUANTILE,
    "holding_period": HOLDING_PERIOD,
    "tcost_bps": TCOST_BPS,
    "random_state": RANDOM_STATE,
    "n_tickers": int(df['ticker'].nunique()),
    "n_rows_df": int(len(df)),
    "n_features": int(len(feature_cols)),
}
with open(out_dir / "config.json", "w") as f:
    json.dump(metadata, f, indent=2)

# --- Save feature list, tickers, and windows used ---
pd.Series(feature_cols, name="feature").to_csv(out_dir / "features.csv", index=False)
pd.Series(sorted(df['ticker'].unique()), name="ticker").to_csv(out_dir / "universe.csv", index=False)

# Store window boundaries for audit
wf_tbl = pd.DataFrame(
    [(w.train_start, w.train_end, w.test_start, w.test_end) for w in wf_windows_run],
    columns=["train_start","train_end","test_start","test_end"]
)
wf_tbl.to_csv(out_dir / "wf_windows.csv", index=False)

# --- Performance & IC summary ---
perf = perf_stats(bt_5d['ret'])
summary = {
    "CAGR":  float(perf["CAGR"]) if perf["CAGR"]==perf["CAGR"] else np.nan,
    "Vol":   float(perf["Vol"])  if perf["Vol"]==perf["Vol"]   else np.nan,
    "Sharpe":float(perf["Sharpe"]) if perf["Sharpe"]==perf["Sharpe"] else np.nan,
    "MaxDD": float(perf["MaxDD"]) if perf["MaxDD"]==perf["MaxDD"] else np.nan,
    "N_days":int(perf["N"]),
}

# IC
if "ic_true" in locals() and isinstance(ic_true, pd.Series) and len(ic_true):
    summary.update({
        "IC_mean": float(ic_true.mean()),
        "IC_std":  float(ic_true.std()),
        "IC_t":    float(ic_true.mean()/ic_true.std()*np.sqrt(len(ic_true))) if ic_true.std() > 0 else np.nan,
        "IC_N":    int(len(ic_true)),
    })

pd.DataFrame([summary]).to_csv(out_dir / "summary.csv", index=False)

# --- Save an equity curve PNG ---
plt.figure(figsize=(10,4))
title_str = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
bt_5d['equity'].plot(title=title_str)
plt.tight_layout()
plt.savefig(out_dir / "equity_curve.png", dpi=150)
plt.close()

# --- update a 'latest' pointer ---
latest_link = RUNS_DIR / "latest"
try:
    if latest_link.exists() or latest_link.is_symlink():
        latest_link.unlink()
    latest_link.symlink_to(out_dir.resolve())
except Exception:
    with open(RUNS_DIR / "LATEST_RUN.txt", "w") as f:
        f.write(str(out_dir.resolve()))

print(f"Artifacts saved → {out_dir}")
Artifacts saved → results\equity_daily\20250902-193628

13 - Reload helper¶

In [29]:
def load_run(run_path: Path):
    preds   = pd.read_parquet(run_path / "pred_5d.parquet")
    bt      = pd.read_parquet(run_path / "bt_5d.parquet")
    weights = pd.read_parquet(run_path / "weights_held.parquet")
    with open(run_path / "config.json") as f:
        cfg = json.load(f)
    return preds, bt, weights, cfg

# Example:
# preds, bt, weights, cfg = load_run(Path("results/equity_daily/latest"))